1/*
2 Simple DirectMedia Layer
3 Copyright (C) 1997-2025 Sam Lantinga <slouken@libsdl.org>
4
5 This software is provided 'as-is', without any express or implied
6 warranty. In no event will the authors be held liable for any damages
7 arising from the use of this software.
8
9 Permission is granted to anyone to use this software for any purpose,
10 including commercial applications, and to alter it and redistribute it
11 freely, subject to the following restrictions:
12
13 1. The origin of this software must not be misrepresented; you must not
14 claim that you wrote the original software. If you use this software
15 in a product, an acknowledgment in the product documentation would be
16 appreciated but is not required.
17 2. Altered source versions must be plainly marked as such, and must not be
18 misrepresented as being the original software.
19 3. This notice may not be removed or altered from any source distribution.
20*/
21
22// The generic backend uses a threadpool to block on synchronous i/o.
23// This is not ideal, it's meant to be used if there isn't a platform-specific
24// backend that can do something more efficient!
25
26#include "SDL_internal.h"
27#include "../SDL_sysasyncio.h"
28
29// on Emscripten without threads, async i/o is synchronous. Sorry. Almost
30// everything is MEMFS, so it's just a memcpy anyhow, and the Emscripten
31// filesystem APIs don't offer async. In theory, directly accessing
32// persistent storage _does_ offer async APIs at the browser level, but
33// that's not exposed in Emscripten's filesystem abstraction.
34#if defined(SDL_PLATFORM_EMSCRIPTEN) && !defined(__EMSCRIPTEN_PTHREADS__)
35#define SDL_ASYNCIO_USE_THREADPOOL 0
36#else
37#define SDL_ASYNCIO_USE_THREADPOOL 1
38#endif
39
40typedef struct GenericAsyncIOQueueData
41{
42 SDL_Mutex *lock;
43 SDL_Condition *condition;
44 SDL_AsyncIOTask completed_tasks;
45} GenericAsyncIOQueueData;
46
47typedef struct GenericAsyncIOData
48{
49 SDL_Mutex *lock; // !!! FIXME: we can skip this lock if we have an equivalent of pread/pwrite
50 SDL_IOStream *io;
51} GenericAsyncIOData;
52
53static void AsyncIOTaskComplete(SDL_AsyncIOTask *task)
54{
55 SDL_assert(task->queue);
56 GenericAsyncIOQueueData *data = (GenericAsyncIOQueueData *) task->queue->userdata;
57 SDL_LockMutex(data->lock);
58 LINKED_LIST_PREPEND(task, data->completed_tasks, queue);
59 SDL_SignalCondition(data->condition); // wake a thread waiting on the queue.
60 SDL_UnlockMutex(data->lock);
61}
62
63// synchronous i/o is offloaded onto the threadpool. This function does the threaded work.
64// This is called directly, without a threadpool, if !SDL_ASYNCIO_USE_THREADPOOL.
65static void SynchronousIO(SDL_AsyncIOTask *task)
66{
67 SDL_assert(task->result != SDL_ASYNCIO_CANCELED); // shouldn't have gotten in here if canceled!
68
69 GenericAsyncIOData *data = (GenericAsyncIOData *) task->asyncio->userdata;
70 SDL_IOStream *io = data->io;
71 const size_t size = (size_t) task->requested_size;
72 void *ptr = task->buffer;
73
74 // this seek won't work if two tasks are reading from the same file at the same time,
75 // so we lock here. This makes multiple reads from a single file serialize, but different
76 // files will still run in parallel. An app can also open the same file twice to avoid this.
77 SDL_LockMutex(data->lock);
78 if (task->type == SDL_ASYNCIO_TASK_CLOSE) {
79 bool okay = true;
80 if (task->flush) {
81 okay = SDL_FlushIO(data->io);
82 }
83 okay = SDL_CloseIO(data->io) && okay;
84 task->result = okay ? SDL_ASYNCIO_COMPLETE : SDL_ASYNCIO_FAILURE;
85 } else if (SDL_SeekIO(io, (Sint64) task->offset, SDL_IO_SEEK_SET) < 0) {
86 task->result = SDL_ASYNCIO_FAILURE;
87 } else {
88 const bool writing = (task->type == SDL_ASYNCIO_TASK_WRITE);
89 task->result_size = (Uint64) (writing ? SDL_WriteIO(io, ptr, size) : SDL_ReadIO(io, ptr, size));
90 if (task->result_size == task->requested_size) {
91 task->result = SDL_ASYNCIO_COMPLETE;
92 } else {
93 if (writing) {
94 task->result = SDL_ASYNCIO_FAILURE; // it's always a failure on short writes.
95 } else {
96 const SDL_IOStatus status = SDL_GetIOStatus(io);
97 SDL_assert(status != SDL_IO_STATUS_READY); // this should have either failed or been EOF.
98 SDL_assert(status != SDL_IO_STATUS_NOT_READY); // these should not be non-blocking reads!
99 task->result = (status == SDL_IO_STATUS_EOF) ? SDL_ASYNCIO_COMPLETE : SDL_ASYNCIO_FAILURE;
100 }
101 }
102 }
103 SDL_UnlockMutex(data->lock);
104
105 AsyncIOTaskComplete(task);
106}
107
108#if SDL_ASYNCIO_USE_THREADPOOL
109static SDL_InitState threadpool_init;
110static SDL_Mutex *threadpool_lock = NULL;
111static bool stop_threadpool = false;
112static SDL_AsyncIOTask threadpool_tasks;
113static SDL_Condition *threadpool_condition = NULL;
114static int max_threadpool_threads = 0;
115static int running_threadpool_threads = 0;
116static int idle_threadpool_threads = 0;
117static int threadpool_threads_spun = 0;
118
119static int SDLCALL AsyncIOThreadpoolWorker(void *data)
120{
121 SDL_LockMutex(threadpool_lock);
122
123 while (!stop_threadpool) {
124 SDL_AsyncIOTask *task = LINKED_LIST_START(threadpool_tasks, threadpool);
125 if (!task) {
126 // if we go 30 seconds without a new task, terminate unless we're the only thread left.
127 idle_threadpool_threads++;
128 const bool rc = SDL_WaitConditionTimeout(threadpool_condition, threadpool_lock, 30000);
129 idle_threadpool_threads--;
130
131 if (!rc) {
132 // decide if we have too many idle threads, and if so, quit to let thread pool shrink when not busy.
133 if (idle_threadpool_threads) {
134 break;
135 }
136 }
137
138 continue;
139 }
140
141 LINKED_LIST_UNLINK(task, threadpool);
142
143 SDL_UnlockMutex(threadpool_lock);
144
145 // bookkeeping is done, so we drop the mutex and fire the work.
146 SynchronousIO(task);
147
148 SDL_LockMutex(threadpool_lock); // take the lock again and see if there's another task (if not, we'll wait on the Condition).
149 }
150
151 running_threadpool_threads--;
152
153 // this is kind of a hack, but this lets us reuse threadpool_condition to block on shutdown until all threads have exited.
154 if (stop_threadpool) {
155 SDL_BroadcastCondition(threadpool_condition);
156 }
157
158 SDL_UnlockMutex(threadpool_lock);
159
160 return 0;
161}
162
163static bool MaybeSpinNewWorkerThread(void)
164{
165 // if all existing threads are busy and the pool of threads isn't maxed out, make a new one.
166 if ((idle_threadpool_threads == 0) && (running_threadpool_threads < max_threadpool_threads)) {
167 char threadname[32];
168 SDL_snprintf(threadname, sizeof (threadname), "SDLasyncio%d", threadpool_threads_spun);
169 SDL_Thread *thread = SDL_CreateThread(AsyncIOThreadpoolWorker, threadname, NULL);
170 if (thread == NULL) {
171 return false;
172 }
173 SDL_DetachThread(thread); // these terminate themselves when idle too long, so we never WaitThread.
174 running_threadpool_threads++;
175 threadpool_threads_spun++;
176 }
177 return true;
178}
179
180static void QueueAsyncIOTask(SDL_AsyncIOTask *task)
181{
182 SDL_assert(task != NULL);
183
184 SDL_LockMutex(threadpool_lock);
185
186 if (stop_threadpool) { // just in case.
187 task->result = SDL_ASYNCIO_CANCELED;
188 AsyncIOTaskComplete(task);
189 } else {
190 LINKED_LIST_PREPEND(task, threadpool_tasks, threadpool);
191 MaybeSpinNewWorkerThread(); // okay if this fails or the thread pool is maxed out. Something will get there eventually.
192
193 // tell idle threads to get to work.
194 // This is a broadcast because we want someone from the thread pool to wake up, but
195 // also shutdown might also be blocking on this. One of the threads will grab
196 // it, the others will go back to sleep.
197 SDL_BroadcastCondition(threadpool_condition);
198 }
199
200 SDL_UnlockMutex(threadpool_lock);
201}
202
203// We don't initialize async i/o at all until it's used, so
204// JUST IN CASE two things try to start at the same time,
205// this will make sure everything gets the same mutex.
206static bool PrepareThreadpool(void)
207{
208 bool okay = true;
209 if (SDL_ShouldInit(&threadpool_init)) {
210 max_threadpool_threads = (SDL_GetNumLogicalCPUCores() * 2) + 1; // !!! FIXME: this should probably have a hint to override.
211 max_threadpool_threads = SDL_clamp(max_threadpool_threads, 1, 8); // 8 is probably more than enough.
212
213 okay = (okay && ((threadpool_lock = SDL_CreateMutex()) != NULL));
214 okay = (okay && ((threadpool_condition = SDL_CreateCondition()) != NULL));
215 okay = (okay && MaybeSpinNewWorkerThread()); // make sure at least one thread is going, since we'll need it.
216
217 if (!okay) {
218 if (threadpool_condition) {
219 SDL_DestroyCondition(threadpool_condition);
220 threadpool_condition = NULL;
221 }
222 if (threadpool_lock) {
223 SDL_DestroyMutex(threadpool_lock);
224 threadpool_lock = NULL;
225 }
226 }
227
228 SDL_SetInitialized(&threadpool_init, okay);
229 }
230 return okay;
231}
232
233static void ShutdownThreadpool(void)
234{
235 if (SDL_ShouldQuit(&threadpool_init)) {
236 SDL_LockMutex(threadpool_lock);
237
238 // cancel anything that's still pending.
239 SDL_AsyncIOTask *task;
240 while ((task = LINKED_LIST_START(threadpool_tasks, threadpool)) != NULL) {
241 LINKED_LIST_UNLINK(task, threadpool);
242 task->result = SDL_ASYNCIO_CANCELED;
243 AsyncIOTaskComplete(task);
244 }
245
246 stop_threadpool = true;
247 SDL_BroadcastCondition(threadpool_condition); // tell the whole threadpool to wake up and quit.
248
249 while (running_threadpool_threads > 0) {
250 // each threadpool thread will broadcast this condition before it terminates if stop_threadpool is set.
251 // we can't just join the threads because they are detached, so the thread pool can automatically shrink as necessary.
252 SDL_WaitCondition(threadpool_condition, threadpool_lock);
253 }
254
255 SDL_UnlockMutex(threadpool_lock);
256
257 SDL_DestroyMutex(threadpool_lock);
258 threadpool_lock = NULL;
259 SDL_DestroyCondition(threadpool_condition);
260 threadpool_condition = NULL;
261
262 max_threadpool_threads = running_threadpool_threads = idle_threadpool_threads = threadpool_threads_spun = 0;
263
264 stop_threadpool = false;
265 SDL_SetInitialized(&threadpool_init, false);
266 }
267}
268#endif
269
270
271static Sint64 generic_asyncio_size(void *userdata)
272{
273 GenericAsyncIOData *data = (GenericAsyncIOData *) userdata;
274 return SDL_GetIOSize(data->io);
275}
276
277static bool generic_asyncio_io(void *userdata, SDL_AsyncIOTask *task)
278{
279 return task->queue->iface.queue_task(task->queue->userdata, task);
280}
281
282static void generic_asyncio_destroy(void *userdata)
283{
284 GenericAsyncIOData *data = (GenericAsyncIOData *) userdata;
285 SDL_DestroyMutex(data->lock);
286 SDL_free(data);
287}
288
289
290static bool generic_asyncioqueue_queue_task(void *userdata, SDL_AsyncIOTask *task)
291{
292 #if SDL_ASYNCIO_USE_THREADPOOL
293 QueueAsyncIOTask(task);
294 #else
295 SynchronousIO(task); // oh well. Get a better platform.
296 #endif
297 return true;
298}
299
300static void generic_asyncioqueue_cancel_task(void *userdata, SDL_AsyncIOTask *task)
301{
302 #if !SDL_ASYNCIO_USE_THREADPOOL // in theory, this was all synchronous and should never call this, but just in case.
303 task->result = SDL_ASYNCIO_CANCELED;
304 AsyncIOTaskComplete(task);
305 #else
306 // we can't stop i/o that's in-flight, but we _can_ just refuse to start it if the threadpool hadn't picked it up yet.
307 SDL_LockMutex(threadpool_lock);
308 if (LINKED_LIST_PREV(task, threadpool) != NULL) { // still in the queue waiting to be run? Take it out.
309 LINKED_LIST_UNLINK(task, threadpool);
310 task->result = SDL_ASYNCIO_CANCELED;
311 AsyncIOTaskComplete(task);
312 }
313 SDL_UnlockMutex(threadpool_lock);
314 #endif
315}
316
317static SDL_AsyncIOTask *generic_asyncioqueue_get_results(void *userdata)
318{
319 GenericAsyncIOQueueData *data = (GenericAsyncIOQueueData *) userdata;
320 SDL_LockMutex(data->lock);
321 SDL_AsyncIOTask *task = LINKED_LIST_START(data->completed_tasks, queue);
322 if (task) {
323 LINKED_LIST_UNLINK(task, queue);
324 }
325 SDL_UnlockMutex(data->lock);
326 return task;
327}
328
329static SDL_AsyncIOTask *generic_asyncioqueue_wait_results(void *userdata, Sint32 timeoutMS)
330{
331 GenericAsyncIOQueueData *data = (GenericAsyncIOQueueData *) userdata;
332 SDL_LockMutex(data->lock);
333 SDL_AsyncIOTask *task = LINKED_LIST_START(data->completed_tasks, queue);
334 if (!task) {
335 SDL_WaitConditionTimeout(data->condition, data->lock, timeoutMS);
336 task = LINKED_LIST_START(data->completed_tasks, queue);
337 }
338 if (task) {
339 LINKED_LIST_UNLINK(task, queue);
340 }
341 SDL_UnlockMutex(data->lock);
342 return task;
343}
344
345static void generic_asyncioqueue_signal(void *userdata)
346{
347 GenericAsyncIOQueueData *data = (GenericAsyncIOQueueData *) userdata;
348 SDL_LockMutex(data->lock);
349 SDL_BroadcastCondition(data->condition);
350 SDL_UnlockMutex(data->lock);
351}
352
353static void generic_asyncioqueue_destroy(void *userdata)
354{
355 GenericAsyncIOQueueData *data = (GenericAsyncIOQueueData *) userdata;
356 SDL_DestroyMutex(data->lock);
357 SDL_DestroyCondition(data->condition);
358 SDL_free(data);
359}
360
361bool SDL_SYS_CreateAsyncIOQueue_Generic(SDL_AsyncIOQueue *queue)
362{
363 #if SDL_ASYNCIO_USE_THREADPOOL
364 if (!PrepareThreadpool()) {
365 return false;
366 }
367 #endif
368
369 GenericAsyncIOQueueData *data = (GenericAsyncIOQueueData *) SDL_calloc(1, sizeof (*data));
370 if (!data) {
371 return false;
372 }
373
374 data->lock = SDL_CreateMutex();
375 if (!data->lock) {
376 SDL_free(data);
377 return false;
378 }
379
380 data->condition = SDL_CreateCondition();
381 if (!data->condition) {
382 SDL_DestroyMutex(data->lock);
383 SDL_free(data);
384 return false;
385 }
386
387 static const SDL_AsyncIOQueueInterface SDL_AsyncIOQueue_Generic = {
388 generic_asyncioqueue_queue_task,
389 generic_asyncioqueue_cancel_task,
390 generic_asyncioqueue_get_results,
391 generic_asyncioqueue_wait_results,
392 generic_asyncioqueue_signal,
393 generic_asyncioqueue_destroy
394 };
395
396 SDL_copyp(&queue->iface, &SDL_AsyncIOQueue_Generic);
397 queue->userdata = data;
398 return true;
399}
400
401
402bool SDL_SYS_AsyncIOFromFile_Generic(const char *file, const char *mode, SDL_AsyncIO *asyncio)
403{
404 #if SDL_ASYNCIO_USE_THREADPOOL
405 if (!PrepareThreadpool()) {
406 return false;
407 }
408 #endif
409
410 GenericAsyncIOData *data = (GenericAsyncIOData *) SDL_calloc(1, sizeof (*data));
411 if (!data) {
412 return false;
413 }
414
415 data->lock = SDL_CreateMutex();
416 if (!data->lock) {
417 SDL_free(data);
418 return false;
419 }
420
421 data->io = SDL_IOFromFile(file, mode);
422 if (!data->io) {
423 SDL_DestroyMutex(data->lock);
424 SDL_free(data);
425 return false;
426 }
427
428 static const SDL_AsyncIOInterface SDL_AsyncIOFile_Generic = {
429 generic_asyncio_size,
430 generic_asyncio_io,
431 generic_asyncio_io,
432 generic_asyncio_io,
433 generic_asyncio_destroy
434 };
435
436 SDL_copyp(&asyncio->iface, &SDL_AsyncIOFile_Generic);
437 asyncio->userdata = data;
438 return true;
439}
440
441void SDL_SYS_QuitAsyncIO_Generic(void)
442{
443 #if SDL_ASYNCIO_USE_THREADPOOL
444 ShutdownThreadpool();
445 #endif
446}
447
448
449#if SDL_ASYNCIO_ONLY_HAVE_GENERIC
450bool SDL_SYS_AsyncIOFromFile(const char *file, const char *mode, SDL_AsyncIO *asyncio)
451{
452 return SDL_SYS_AsyncIOFromFile_Generic(file, mode, asyncio);
453}
454
455bool SDL_SYS_CreateAsyncIOQueue(SDL_AsyncIOQueue *queue)
456{
457 return SDL_SYS_CreateAsyncIOQueue_Generic(queue);
458}
459
460void SDL_SYS_QuitAsyncIO(void)
461{
462 SDL_SYS_QuitAsyncIO_Generic();
463}
464#endif
465
466