1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9/*
10 * @a Niels Nes, Peter Boncz
11 * @+ Threads
12 * This file contains a wrapper layer for threading, hence the
13 * underscore convention MT_x (Multi-Threading). As all platforms
14 * that MonetDB runs on now support POSIX Threads (pthreads), this
15 * wrapping layer has become rather thin.
16 *
17 * In the late 1990s when multi-threading support was introduced in
18 * MonetDB, pthreads was just emerging as a standard API and not
19 * widely adopted yet. The earliest MT implementation focused on SGI
20 * Unix and provided multi- threading using multiple processses, and
21 * shared memory.
22 *
23 * One of the relics of this model, namely the need to pre-allocate
24 * locks and semaphores, and consequently a maximum number of them,
25 * has been removed in the latest iteration of this layer.
26 *
27 */
28/*
29 * @- Mthreads Routine implementations
30 */
31#include "monetdb_config.h"
32#include "mstring.h"
33#include "gdk_system.h"
34#include "gdk_system_private.h"
35
36#include <time.h>
37
38#ifdef HAVE_FTIME
39#include <sys/timeb.h> /* ftime */
40#endif
41#ifdef HAVE_SYS_TIME_H
42#include <sys/time.h> /* gettimeofday */
43#endif
44
45#include <signal.h>
46#include <string.h> /* for strerror */
47#include <unistd.h> /* for sysconf symbols */
48
49#ifdef LOCK_STATS
50
51ATOMIC_TYPE GDKlockcnt = ATOMIC_VAR_INIT(0);
52ATOMIC_TYPE GDKlockcontentioncnt = ATOMIC_VAR_INIT(0);
53ATOMIC_TYPE GDKlocksleepcnt = ATOMIC_VAR_INIT(0);
54MT_Lock *volatile GDKlocklist = 0;
55ATOMIC_FLAG GDKlocklistlock = ATOMIC_FLAG_INIT;
56
57/* merge sort of linked list */
58static MT_Lock *
59sortlocklist(MT_Lock *l)
60{
61 MT_Lock *r, *t, *ll = NULL;
62
63 if (l == NULL || l->next == NULL) {
64 /* list is trivially sorted (0 or 1 element) */
65 return l;
66 }
67 /* break list into two (almost) equal pieces:
68 * l is start of "left" list, r of "right" list, ll last
69 * element of "left" list */
70 for (t = r = l; t && t->next; t = t->next->next) {
71 ll = r;
72 r = r->next;
73 }
74 ll->next = NULL; /* break list into two */
75 /* recursively sort both sublists */
76 l = sortlocklist(l);
77 r = sortlocklist(r);
78 /* merge
79 * t is new list, ll is last element of new list, l and r are
80 * start of unprocessed part of left and right lists */
81 t = ll = NULL;
82 while (l && r) {
83 if (ATOMIC_GET(&l->sleep) < ATOMIC_GET(&r->sleep) ||
84 (ATOMIC_GET(&l->sleep) == ATOMIC_GET(&r->sleep) &&
85 (ATOMIC_GET(&l->contention) < ATOMIC_GET(&r->contention) ||
86 (ATOMIC_GET(&l->contention) == ATOMIC_GET(&r->contention) &&
87 l->count <= r->count)))) {
88 /* l is smaller */
89 if (ll == NULL) {
90 assert(t == NULL);
91 t = ll = l;
92 } else {
93 ll->next = l;
94 ll = ll->next;
95 }
96 l = l->next;
97 } else {
98 /* r is smaller */
99 if (ll == NULL) {
100 assert(t == NULL);
101 t = ll = r;
102 } else {
103 ll->next = r;
104 ll = ll->next;
105 }
106 r = r->next;
107 }
108 }
109 /* append rest of remaining list */
110 ll->next = l ? l : r;
111 return t;
112}
113
114static inline bool
115lock_isset(MT_Lock *l)
116{
117 if (MT_lock_try(l)) {
118 MT_lock_unset(l);
119 return false;
120 }
121 return true;
122}
123
124void
125GDKlockstatistics(int what)
126{
127 MT_Lock *l;
128 int n = 0;
129
130 if (ATOMIC_TAS(&GDKlocklistlock) != 0) {
131 fprintf(stderr, "#WARNING: GDKlocklistlock is set, so cannot access lock list\n");
132 return;
133 }
134 if (what == -1) {
135 for (l = GDKlocklist; l; l = l->next) {
136 l->count = 0;
137 ATOMIC_SET(&l->contention, 0);
138 ATOMIC_SET(&l->sleep, 0);
139 }
140 ATOMIC_CLEAR(&GDKlocklistlock);
141 return;
142 }
143 GDKlocklist = sortlocklist(GDKlocklist);
144 fprintf(stderr, "# lock name\tcount\tcontention\tsleep\tlocked\t(un)locker\tthread\n");
145 for (l = GDKlocklist; l; l = l->next) {
146 n++;
147 if (what == 0 ||
148 (what == 1 && l->count) ||
149 (what == 2 && ATOMIC_GET(&l->contention)) ||
150 (what == 3 && lock_isset(l)))
151 fprintf(stderr, "# %-18s\t%zu\t%zu\t%zu\t%s\t%s\t%s\n",
152 l->name, l->count,
153 (size_t) ATOMIC_GET(&l->contention),
154 (size_t) ATOMIC_GET(&l->sleep),
155 lock_isset(l) ? "locked" : "",
156 l->locker ? l->locker : "",
157 l->thread ? l->thread : "");
158 }
159 fprintf(stderr, "#number of locks %d\n", n);
160 fprintf(stderr, "#total lock count %zu\n", (size_t) ATOMIC_GET(&GDKlockcnt));
161 fprintf(stderr, "#lock contention %zu\n", (size_t) ATOMIC_GET(&GDKlockcontentioncnt));
162 fprintf(stderr, "#lock sleep count %zu\n", (size_t) ATOMIC_GET(&GDKlocksleepcnt));
163 ATOMIC_CLEAR(&GDKlocklistlock);
164}
165
166#endif /* LOCK_STATS */
167
168#if !defined(HAVE_PTHREAD_H) && defined(WIN32)
169static struct winthread {
170 struct winthread *next;
171 HANDLE hdl;
172 DWORD tid;
173 void (*func) (void *);
174 void *data;
175 MT_Lock *lockwait; /* lock we're waiting for */
176 MT_Sema *semawait; /* semaphore we're waiting for */
177 struct winthread *joinwait; /* process we are joining with */
178 const char *working; /* what we're currently doing */
179 ATOMIC_TYPE exited;
180 bool detached:1, waiting:1;
181 char threadname[16];
182} *winthreads = NULL;
183static struct winthread mainthread = {
184 .threadname = "main thread",
185 .exited = ATOMIC_VAR_INIT(0),
186};
187
188static CRITICAL_SECTION winthread_cs;
189static DWORD threadslot = TLS_OUT_OF_INDEXES;
190
191void
192dump_threads(void)
193{
194 EnterCriticalSection(&winthread_cs);
195 for (struct winthread *w = winthreads; w; w = w->next) {
196 fprintf(stderr, "%s, waiting for %s, working on %.200s\n",
197 w->threadname,
198 w->lockwait ? w->lockwait->name :
199 w->semawait ? w->semawait->name :
200 w->joinwait ? w->joinwait->threadname :
201 "nothing",
202 ATOMIC_GET(&w->exited) ? "exiting" :
203 w->working ? w->working : "nothing");
204 }
205 LeaveCriticalSection(&winthread_cs);
206}
207
208bool
209MT_thread_init(void)
210{
211 if (threadslot == TLS_OUT_OF_INDEXES) {
212 threadslot = TlsAlloc();
213 if (threadslot == TLS_OUT_OF_INDEXES)
214 return false;
215 mainthread.tid = GetCurrentThreadId();
216 if (TlsSetValue(threadslot, &mainthread) == 0) {
217 TlsFree(threadslot);
218 threadslot = TLS_OUT_OF_INDEXES;
219 return false;
220 }
221 InitializeCriticalSection(&winthread_cs);
222 }
223 return true;
224}
225
226static struct winthread *
227find_winthread(DWORD tid)
228{
229 struct winthread *w;
230
231 EnterCriticalSection(&winthread_cs);
232 for (w = winthreads; w && w->tid != tid; w = w->next)
233 ;
234 LeaveCriticalSection(&winthread_cs);
235 return w;
236}
237
238const char *
239MT_thread_getname(void)
240{
241 struct winthread *w = TlsGetValue(threadslot);
242 return w ? w->threadname : "unknown thread";
243}
244
245void
246MT_thread_setdata(void *data)
247{
248 struct winthread *w = TlsGetValue(threadslot);
249
250 if (w)
251 w->data = data;
252}
253
254void
255MT_thread_setlockwait(MT_Lock *lock)
256{
257 struct winthread *w = TlsGetValue(threadslot);
258
259 if (w)
260 w->lockwait = lock;
261}
262
263void
264MT_thread_setsemawait(MT_Sema *sema)
265{
266 struct winthread *w = TlsGetValue(threadslot);
267
268 if (w)
269 w->semawait = sema;
270}
271
272void
273MT_thread_setworking(const char *work)
274{
275 struct winthread *w = TlsGetValue(threadslot);
276
277 if (w)
278 w->working = work;
279}
280
281void *
282MT_thread_getdata(void)
283{
284 struct winthread *w = TlsGetValue(threadslot);
285
286 return w ? w->data : NULL;
287}
288
289static void
290rm_winthread(struct winthread *w)
291{
292 struct winthread **wp;
293
294 EnterCriticalSection(&winthread_cs);
295 for (wp = &winthreads; *wp && *wp != w; wp = &(*wp)->next)
296 ;
297 if (*wp)
298 *wp = w->next;
299 LeaveCriticalSection(&winthread_cs);
300 ATOMIC_DESTROY(&w->exited);
301 free(w);
302}
303
304static DWORD WINAPI
305thread_starter(LPVOID arg)
306{
307 struct winthread *w = (struct winthread *) arg;
308 void *data = w->data;
309
310 w->data = NULL;
311 TlsSetValue(threadslot, w);
312 (*w->func)(data);
313 ATOMIC_SET(&w->exited, 1);
314 THRDDEBUG fprintf(stderr, "#exit \"%s\"\n", w->threadname);
315 return 0;
316}
317
318static void
319join_threads(void)
320{
321 bool waited;
322
323 struct winthread *self = TlsGetValue(threadslot);
324 EnterCriticalSection(&winthread_cs);
325 do {
326 waited = false;
327 for (struct winthread *w = winthreads; w; w = w->next) {
328 if (w->detached && !w->waiting && ATOMIC_GET(&w->exited)) {
329 w->waiting = true;
330 LeaveCriticalSection(&winthread_cs);
331 THRDDEBUG fprintf(stderr, "#join \"%s\" \"%s\"\n", MT_thread_getname(), w->threadname);
332 self->joinwait = w;
333 WaitForSingleObject(w->hdl, INFINITE);
334 self->joinwait = NULL;
335 CloseHandle(w->hdl);
336 rm_winthread(w);
337 waited = true;
338 EnterCriticalSection(&winthread_cs);
339 break;
340 }
341 }
342 } while (waited);
343 LeaveCriticalSection(&winthread_cs);
344}
345
346void
347join_detached_threads(void)
348{
349 bool waited;
350
351 struct winthread *self = TlsGetValue(threadslot);
352 EnterCriticalSection(&winthread_cs);
353 do {
354 waited = false;
355 for (struct winthread *w = winthreads; w; w = w->next) {
356 if (w->detached && !w->waiting) {
357 w->waiting = true;
358 LeaveCriticalSection(&winthread_cs);
359 THRDDEBUG fprintf(stderr, "#join \"%s\" \"%s\"\n", MT_thread_getname(), w->threadname);
360 self->joinwait = w;
361 WaitForSingleObject(w->hdl, INFINITE);
362 self->joinwait = NULL;
363 CloseHandle(w->hdl);
364 rm_winthread(w);
365 waited = true;
366 EnterCriticalSection(&winthread_cs);
367 break;
368 }
369 }
370 } while (waited);
371 LeaveCriticalSection(&winthread_cs);
372}
373
374int
375MT_create_thread(MT_Id *t, void (*f) (void *), void *arg, enum MT_thr_detach d, const char *threadname)
376{
377 struct winthread *w = malloc(sizeof(*w));
378
379 if (w == NULL)
380 return -1;
381
382 join_threads();
383 *w = (struct winthread) {
384 .func = f,
385 .data = arg,
386 .waiting = false,
387 .detached = (d == MT_THR_DETACHED),
388 };
389 ATOMIC_INIT(&w->exited, 0);
390 strcpy_len(w->threadname, threadname, sizeof(w->threadname));
391 THRDDEBUG fprintf(stderr, "#create \"%s\" \"%s\"\n", MT_thread_getname(), threadname);
392 EnterCriticalSection(&winthread_cs);
393 w->hdl = CreateThread(NULL, THREAD_STACK_SIZE, thread_starter, w,
394 0, &w->tid);
395 if (w->hdl == NULL) {
396 LeaveCriticalSection(&winthread_cs);
397 return -1;
398 }
399 w->next = winthreads;
400 winthreads = w;
401 LeaveCriticalSection(&winthread_cs);
402 /* must not fail after this: the thread has been started */
403 *t = (MT_Id) w->tid;
404 return 0;
405}
406
407MT_Id
408MT_getpid(void)
409{
410 return (MT_Id) GetCurrentThreadId();
411}
412
413void
414MT_exiting_thread(void)
415{
416 struct winthread *w = TlsGetValue(threadslot);
417
418 if (w) {
419 ATOMIC_SET(&w->exited, 1);
420 w->working = NULL;
421 }
422}
423
424int
425MT_join_thread(MT_Id t)
426{
427 struct winthread *w;
428
429 assert(t != mainthread.tid);
430 join_threads();
431 w = find_winthread((DWORD) t);
432 if (w == NULL || w->hdl == NULL)
433 return -1;
434 THRDDEBUG fprintf(stderr, "#join \"%s\" \"%s\"\n", MT_thread_getname(), w->threadname);
435 struct winthread *self = TlsGetValue(threadslot);
436 self->joinwait = w;
437 DWORD ret = WaitForSingleObject(w->hdl, INFINITE);
438 self->joinwait = NULL;
439 if (ret == WAIT_OBJECT_0 && CloseHandle(w->hdl)) {
440 rm_winthread(w);
441 return 0;
442 }
443 return -1;
444}
445
446int
447MT_kill_thread(MT_Id t)
448{
449 struct winthread *w;
450
451 assert(t != mainthread.tid);
452 join_threads();
453 w = find_winthread((DWORD) t);
454 if (w == NULL)
455 return -1;
456 if (w->hdl == NULL) {
457 /* detached thread */
458 HANDLE h;
459 int ret = 0;
460 h = OpenThread(THREAD_ALL_ACCESS, 0, (DWORD) t);
461 if (h == NULL)
462 return -1;
463 if (TerminateThread(h, -1))
464 ret = -1;
465 CloseHandle(h);
466 return ret;
467 }
468 if (TerminateThread(w->hdl, -1))
469 return 0;
470 return -1;
471}
472
473#else /* !defined(HAVE_PTHREAD_H) && defined(_MSC_VER) */
474
475static struct posthread {
476 struct posthread *next;
477 void (*func)(void *);
478 void *data;
479 MT_Lock *lockwait; /* lock we're waiting for */
480 MT_Sema *semawait; /* semaphore we're waiting for */
481 struct posthread *joinwait; /* process we are joining with */
482 const char *working; /* what we're currently doing */
483 char threadname[16];
484 pthread_t tid;
485 MT_Id mtid;
486 ATOMIC_TYPE exited;
487 bool detached:1, waiting:1;
488} *posthreads = NULL;
489static struct posthread mainthread = {
490 .threadname = "main thread",
491 .mtid = 1,
492 .exited = ATOMIC_VAR_INIT(0),
493};
494static pthread_mutex_t posthread_lock = PTHREAD_MUTEX_INITIALIZER;
495static MT_Id MT_thread_id = 1;
496
497static pthread_key_t threadkey;
498
499void
500dump_threads(void)
501{
502 pthread_mutex_lock(&posthread_lock);
503 for (struct posthread *p = posthreads; p; p = p->next) {
504 fprintf(stderr, "%s, waiting for %s, working on %.200s\n",
505 p->threadname,
506 p->lockwait ? p->lockwait->name :
507 p->semawait ? p->semawait->name :
508 p->joinwait ? p->joinwait->threadname :
509 "nothing",
510 ATOMIC_GET(&p->exited) ? "exiting" :
511 p->working ? p->working : "nothing");
512 }
513 pthread_mutex_unlock(&posthread_lock);
514}
515
516bool
517MT_thread_init(void)
518{
519 int ret;
520
521 if ((ret = pthread_key_create(&threadkey, NULL)) != 0) {
522 fprintf(stderr,
523 "#MT_thread_init: creating specific key for thread "
524 "failed: %s\n", strerror(ret));
525 return false;
526 }
527 mainthread.tid = pthread_self();
528 if ((ret = pthread_setspecific(threadkey, &mainthread)) != 0) {
529 fprintf(stderr,
530 "#MT_thread_init: setting specific value failed: %s\n",
531 strerror(ret));
532 }
533 return true;
534}
535
536static struct posthread *
537find_posthread(MT_Id tid)
538{
539 struct posthread *p;
540
541 pthread_mutex_lock(&posthread_lock);
542 for (p = posthreads; p && p->mtid != tid; p = p->next)
543 ;
544 pthread_mutex_unlock(&posthread_lock);
545 return p;
546}
547
548const char *
549MT_thread_getname(void)
550{
551 struct posthread *p;
552
553 p = pthread_getspecific(threadkey);
554 return p ? p->threadname : "unknown thread";
555}
556
557void
558MT_thread_setdata(void *data)
559{
560 struct posthread *p = pthread_getspecific(threadkey);
561
562 if (p)
563 p->data = data;
564}
565
566void *
567MT_thread_getdata(void)
568{
569 struct posthread *p = pthread_getspecific(threadkey);
570
571 return p ? p->data : NULL;
572}
573
574void
575MT_thread_setlockwait(MT_Lock *lock)
576{
577 struct posthread *p = pthread_getspecific(threadkey);
578
579 if (p)
580 p->lockwait = lock;
581}
582
583void
584MT_thread_setsemawait(MT_Sema *sema)
585{
586 struct posthread *p = pthread_getspecific(threadkey);
587
588 if (p)
589 p->semawait = sema;
590}
591
592void
593MT_thread_setworking(const char *work)
594{
595 struct posthread *p = pthread_getspecific(threadkey);
596
597 if (p)
598 p->working = work;
599}
600
601#ifdef HAVE_PTHREAD_SIGMASK
602static void
603MT_thread_sigmask(sigset_t *new_mask, sigset_t *orig_mask)
604{
605 /* do not check for errors! */
606 sigdelset(new_mask, SIGQUIT);
607 sigdelset(new_mask, SIGPROF);
608 pthread_sigmask(SIG_SETMASK, new_mask, orig_mask);
609}
610#endif
611
612static void
613rm_posthread_locked(struct posthread *p)
614{
615 struct posthread **pp;
616
617 for (pp = &posthreads; *pp && *pp != p; pp = &(*pp)->next)
618 ;
619 if (*pp)
620 *pp = p->next;
621 ATOMIC_DESTROY(&p->exited);
622 free(p);
623}
624
625static void
626rm_posthread(struct posthread *p)
627{
628 pthread_mutex_lock(&posthread_lock);
629 rm_posthread_locked(p);
630 pthread_mutex_unlock(&posthread_lock);
631}
632
633static void *
634thread_starter(void *arg)
635{
636 struct posthread *p = (struct posthread *) arg;
637 void *data = p->data;
638
639 p->data = NULL;
640 pthread_setspecific(threadkey, p);
641 (*p->func)(data);
642 ATOMIC_SET(&p->exited, 1);
643 THRDDEBUG fprintf(stderr, "#exit \"%s\"\n", p->threadname);
644 return NULL;
645}
646
647static void
648join_threads(void)
649{
650 bool waited;
651
652 struct posthread *self = pthread_getspecific(threadkey);
653 pthread_mutex_lock(&posthread_lock);
654 do {
655 waited = false;
656 for (struct posthread *p = posthreads; p; p = p->next) {
657 if (p->detached && !p->waiting && ATOMIC_GET(&p->exited)) {
658 p->waiting = true;
659 pthread_mutex_unlock(&posthread_lock);
660 THRDDEBUG fprintf(stderr, "#join \"%s\" \"%s\"\n", MT_thread_getname(), p->threadname);
661 self->joinwait = p;
662 pthread_join(p->tid, NULL);
663 self->joinwait = NULL;
664 rm_posthread(p);
665 waited = true;
666 pthread_mutex_lock(&posthread_lock);
667 break;
668 }
669 }
670 } while (waited);
671 pthread_mutex_unlock(&posthread_lock);
672}
673
674void
675join_detached_threads(void)
676{
677 bool waited;
678
679 struct posthread *self = pthread_getspecific(threadkey);
680 pthread_mutex_lock(&posthread_lock);
681 do {
682 waited = false;
683 for (struct posthread *p = posthreads; p; p = p->next) {
684 if (p->detached && !p->waiting) {
685 p->waiting = true;
686 pthread_mutex_unlock(&posthread_lock);
687 THRDDEBUG fprintf(stderr, "#join \"%s\" \"%s\"\n", MT_thread_getname(), p->threadname);
688 self->joinwait = p;
689 pthread_join(p->tid, NULL);
690 self->joinwait = NULL;
691 rm_posthread(p);
692 waited = true;
693 pthread_mutex_lock(&posthread_lock);
694 break;
695 }
696 }
697 } while (waited);
698 pthread_mutex_unlock(&posthread_lock);
699}
700
701int
702MT_create_thread(MT_Id *t, void (*f) (void *), void *arg, enum MT_thr_detach d, const char *threadname)
703{
704 pthread_attr_t attr;
705 int ret;
706 struct posthread *p;
707 size_t tlen;
708
709 join_threads();
710 if (threadname == NULL) {
711 fprintf(stderr, "#MT_create_thread: thread must have a name\n");
712 return -1;
713 }
714 tlen = strlen(threadname);
715 if (tlen >= sizeof(p->threadname)) {
716 fprintf(stderr, "#MT_create_thread: thread's name is too large\n");
717 return -1;
718 }
719 if ((ret = pthread_attr_init(&attr)) != 0) {
720 fprintf(stderr,
721 "#MT_create_thread: cannot init pthread attr: %s\n",
722 strerror(ret));
723 return -1;
724 }
725 if ((ret = pthread_attr_setstacksize(&attr, THREAD_STACK_SIZE)) != 0) {
726 fprintf(stderr,
727 "#MT_create_thread: cannot set stack size: %s\n",
728 strerror(ret));
729 pthread_attr_destroy(&attr);
730 return -1;
731 }
732 p = malloc(sizeof(struct posthread));
733 if (p == NULL) {
734 fprintf(stderr,
735 "#MT_create_thread: cannot allocate memory: %s\n",
736 strerror(errno));
737 pthread_attr_destroy(&attr);
738 return -1;
739 }
740 *p = (struct posthread) {
741 .func = f,
742 .data = arg,
743 .waiting = false,
744 .detached = (d == MT_THR_DETACHED),
745 };
746 ATOMIC_INIT(&p->exited, 0);
747
748 memcpy(p->threadname, threadname, tlen + 1);
749#ifdef HAVE_PTHREAD_SIGMASK
750 sigset_t new_mask, orig_mask;
751 (void) sigfillset(&new_mask);
752 MT_thread_sigmask(&new_mask, &orig_mask);
753#endif
754 THRDDEBUG fprintf(stderr, "#create \"%s\" \"%s\"\n", MT_thread_getname(), threadname);
755 /* protect posthreads during thread creation and only add to
756 * it after the thread was created successfully */
757 pthread_mutex_lock(&posthread_lock);
758 *t = p->mtid = ++MT_thread_id;
759 ret = pthread_create(&p->tid, &attr, thread_starter, p);
760 if (ret != 0) {
761 fprintf(stderr,
762 "#MT_create_thread: cannot start thread: %s\n",
763 strerror(ret));
764 ret = -1;
765 } else {
766 /* must not fail after this: the thread has been started */
767 p->next = posthreads;
768 posthreads = p;
769 }
770 pthread_mutex_unlock(&posthread_lock);
771#ifdef HAVE_PTHREAD_SIGMASK
772 MT_thread_sigmask(&orig_mask, NULL);
773#endif
774 return ret;
775}
776
777MT_Id
778MT_getpid(void)
779{
780 struct posthread *p;
781
782 p = pthread_getspecific(threadkey);
783 return p ? p->mtid : 0;
784}
785
786void
787MT_exiting_thread(void)
788{
789 struct posthread *p;
790
791 p = pthread_getspecific(threadkey);
792 if (p) {
793 ATOMIC_SET(&p->exited, 1);
794 p->working = NULL;
795 }
796}
797
798int
799MT_join_thread(MT_Id t)
800{
801 struct posthread *p;
802 int ret;
803
804 assert(t > 1);
805 join_threads();
806 p = find_posthread(t);
807 if (p == NULL)
808 return -1;
809 THRDDEBUG fprintf(stderr, "#join \"%s\" \"%s\"\n", MT_thread_getname(), p->threadname);
810 struct posthread *self = pthread_getspecific(threadkey);
811 self->joinwait = p;
812 ret = pthread_join(p->tid, NULL);
813 self->joinwait = NULL;
814 if (ret != 0) {
815 fprintf(stderr, "#MT_join_thread: joining thread failed: %s\n",
816 strerror(ret));
817 return -1;
818 }
819 rm_posthread(p);
820 return 0;
821}
822
823int
824MT_kill_thread(MT_Id t)
825{
826 assert(t > 1);
827#ifdef HAVE_PTHREAD_KILL
828 struct posthread *p;
829
830 join_threads();
831 p = find_posthread(t);
832 if (p)
833 return pthread_kill(p->tid, SIGHUP);
834#else
835 (void) t;
836 join_threads();
837#endif
838 return -1;
839}
840#endif
841
842int
843MT_check_nr_cores(void)
844{
845 int ncpus = -1;
846
847#if defined(HAVE_SYSCONF) && defined(_SC_NPROCESSORS_ONLN)
848 /* this works on Linux, Solaris and AIX */
849 ncpus = sysconf(_SC_NPROCESSORS_ONLN);
850#elif defined(HW_NCPU) /* BSD */
851 size_t len = sizeof(int);
852 int mib[3];
853
854 /* Everyone should have permission to make this call,
855 * if we get a failure something is really wrong. */
856 mib[0] = CTL_HW;
857 mib[1] = HW_NCPU;
858 mib[2] = -1;
859 sysctl(mib, 3, &ncpus, &len, NULL, 0);
860#elif defined(WIN32)
861 SYSTEM_INFO sysinfo;
862
863 GetSystemInfo(&sysinfo);
864 ncpus = sysinfo.dwNumberOfProcessors;
865#endif
866
867 /* if we ever need HPUX or OSF/1 (hope not), see
868 * http://ndevilla.free.fr/threads/ */
869
870 if (ncpus <= 0)
871 ncpus = 1;
872#if SIZEOF_SIZE_T == SIZEOF_INT
873 /* On 32-bits systems with large numbers of cpus/cores, we
874 * quickly run out of space due to the number of threads in
875 * use. Since it is questionable whether many cores on a
876 * 32-bits system are going to be beneficial due to this, we
877 * simply limit the auto-detected cores to 16 on 32-bits
878 * systems. The user can always override this via
879 * gdk_nr_threads. */
880 if (ncpus > 16)
881 ncpus = 16;
882#endif
883
884#ifndef WIN32
885 /* get the number of allocated cpus from the cgroup settings */
886 FILE *f = fopen("/sys/fs/cgroup/cpuset/cpuset.cpus", "r");
887 if (f != NULL) {
888 char buf[512];
889 char *p = fgets(buf, 512, f);
890 fclose(f);
891 if (p != NULL) {
892 /* syntax is: ranges of CPU numbers separated
893 * by comma; a range is either a single CPU
894 * id, or two IDs separated by a minus; any
895 * deviation causes the file to be ignored */
896 int ncpu = 0;
897 for (;;) {
898 char *q;
899 unsigned fst = strtoul(p, &q, 10);
900 if (q == p)
901 return ncpus;
902 ncpu++;
903 if (*q == '-') {
904 p = q + 1;
905 unsigned lst = strtoul(p, &q, 10);
906 if (q == p || lst <= fst)
907 return ncpus;
908 ncpu += lst - fst;
909 }
910 if (*q == '\n')
911 break;
912 if (*q != ',')
913 return ncpus;
914 p = q + 1;
915 }
916 if (ncpu < ncpus)
917 return ncpu;
918 }
919 }
920#endif
921
922 return ncpus;
923}
924