1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17// All platform-specific threading support is encapsulated here. */
18
19#ifndef __RML_thread_monitor_H
20#define __RML_thread_monitor_H
21
22#if USE_WINTHREAD
23#include <windows.h>
24#include <process.h>
25#include <malloc.h> //_alloca
26#include "tbb/tbb_misc.h" // support for processor groups
27#if __TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00)
28#include <thread>
29#endif
30#elif USE_PTHREAD
31#include <pthread.h>
32#include <string.h>
33#include <stdlib.h>
34#else
35#error Unsupported platform
36#endif
37#include <stdio.h>
38#include "tbb/itt_notify.h"
39#include "tbb/atomic.h"
40#include "tbb/semaphore.h"
41
42// All platform-specific threading support is in this header.
43
44#if (_WIN32||_WIN64)&&!__TBB_ipf
45// Deal with 64K aliasing. The formula for "offset" is a Fibonacci hash function,
46// which has the desirable feature of spreading out the offsets fairly evenly
47// without knowing the total number of offsets, and furthermore unlikely to
48// accidentally cancel out other 64K aliasing schemes that Microsoft might implement later.
49// See Knuth Vol 3. "Theorem S" for details on Fibonacci hashing.
50// The second statement is really does need "volatile", otherwise the compiler might remove the _alloca.
51#define AVOID_64K_ALIASING(idx) \
52 size_t offset = (idx+1) * 40503U % (1U<<16); \
53 void* volatile sink_for_alloca = _alloca(offset); \
54 __TBB_ASSERT_EX(sink_for_alloca, "_alloca failed");
55#else
56// Linux thread allocators avoid 64K aliasing.
57#define AVOID_64K_ALIASING(idx) tbb::internal::suppress_unused_warning(idx)
58#endif /* _WIN32||_WIN64 */
59
60namespace rml {
61
62namespace internal {
63
64#if DO_ITT_NOTIFY
65static const ::tbb::tchar *SyncType_RML = _T("%Constant");
66static const ::tbb::tchar *SyncObj_ThreadMonitor = _T("RML Thr Monitor");
67#endif /* DO_ITT_NOTIFY */
68
69//! Monitor with limited two-phase commit form of wait.
70/** At most one thread should wait on an instance at a time. */
71class thread_monitor {
72public:
73 class cookie {
74 friend class thread_monitor;
75 tbb::atomic<size_t> my_epoch;
76 };
77 thread_monitor() : skipped_wakeup(false), my_sema() {
78 my_cookie.my_epoch = 0;
79 ITT_SYNC_CREATE(&my_sema, SyncType_RML, SyncObj_ThreadMonitor);
80 in_wait = false;
81 }
82 ~thread_monitor() {}
83
84 //! If a thread is waiting or started a two-phase wait, notify it.
85 /** Can be called by any thread. */
86 void notify();
87
88 //! Begin two-phase wait.
89 /** Should only be called by thread that owns the monitor.
90 The caller must either complete the wait or cancel it. */
91 void prepare_wait( cookie& c );
92
93 //! Complete a two-phase wait and wait until notification occurs after the earlier prepare_wait.
94 void commit_wait( cookie& c );
95
96 //! Cancel a two-phase wait.
97 void cancel_wait();
98
99#if USE_WINTHREAD
100 typedef HANDLE handle_type;
101
102 #define __RML_DECL_THREAD_ROUTINE unsigned WINAPI
103 typedef unsigned (WINAPI *thread_routine_type)(void*);
104
105 //! Launch a thread
106 static handle_type launch( thread_routine_type thread_routine, void* arg, size_t stack_size, const size_t* worker_index = NULL );
107
108#elif USE_PTHREAD
109 typedef pthread_t handle_type;
110
111 #define __RML_DECL_THREAD_ROUTINE void*
112 typedef void*(*thread_routine_type)(void*);
113
114 //! Launch a thread
115 static handle_type launch( thread_routine_type thread_routine, void* arg, size_t stack_size );
116#endif /* USE_PTHREAD */
117
118 //! Yield control to OS
119 /** Affects the calling thread. **/
120 static void yield();
121
122 //! Join thread
123 static void join(handle_type handle);
124
125 //! Detach thread
126 static void detach_thread(handle_type handle);
127private:
128 cookie my_cookie; // epoch counter
129 tbb::atomic<bool> in_wait;
130 bool skipped_wakeup;
131 tbb::internal::binary_semaphore my_sema;
132#if USE_PTHREAD
133 static void check( int error_code, const char* routine );
134#endif
135};
136
137#if USE_WINTHREAD
138
139#ifndef STACK_SIZE_PARAM_IS_A_RESERVATION
140#define STACK_SIZE_PARAM_IS_A_RESERVATION 0x00010000
141#endif
142
143// _beginthreadex API is not available in Windows 8 Store* applications, so use std::thread instead
144#if __TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00)
145inline thread_monitor::handle_type thread_monitor::launch( thread_routine_type thread_function, void* arg, size_t, const size_t*) {
146//TODO: check that exception thrown from std::thread is not swallowed silently
147 std::thread* thread_tmp=new std::thread(thread_function, arg);
148 return thread_tmp->native_handle();
149}
150#else
151inline thread_monitor::handle_type thread_monitor::launch( thread_routine_type thread_routine, void* arg, size_t stack_size, const size_t* worker_index ) {
152 unsigned thread_id;
153 int number_of_processor_groups = ( worker_index ) ? tbb::internal::NumberOfProcessorGroups() : 0;
154 unsigned create_flags = ( number_of_processor_groups > 1 ) ? CREATE_SUSPENDED : 0;
155 HANDLE h = (HANDLE)_beginthreadex( NULL, unsigned(stack_size), thread_routine, arg, STACK_SIZE_PARAM_IS_A_RESERVATION | create_flags, &thread_id );
156 if( !h ) {
157 fprintf(stderr,"thread_monitor::launch: _beginthreadex failed\n");
158 exit(1);
159 }
160 if ( number_of_processor_groups > 1 ) {
161 tbb::internal::MoveThreadIntoProcessorGroup( h,
162 tbb::internal::FindProcessorGroupIndex( static_cast<int>(*worker_index) ) );
163 ResumeThread( h );
164 }
165 return h;
166}
167#endif //__TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00)
168
169void thread_monitor::join(handle_type handle) {
170#if TBB_USE_ASSERT
171 DWORD res =
172#endif
173 WaitForSingleObjectEx(handle, INFINITE, FALSE);
174 __TBB_ASSERT( res==WAIT_OBJECT_0, NULL );
175#if TBB_USE_ASSERT
176 BOOL val =
177#endif
178 CloseHandle(handle);
179 __TBB_ASSERT( val, NULL );
180}
181
182void thread_monitor::detach_thread(handle_type handle) {
183#if TBB_USE_ASSERT
184 BOOL val =
185#endif
186 CloseHandle(handle);
187 __TBB_ASSERT( val, NULL );
188}
189
190inline void thread_monitor::yield() {
191// TODO: consider unification via __TBB_Yield or tbb::this_tbb_thread::yield
192#if __TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00)
193 std::this_thread::yield();
194#else
195 SwitchToThread();
196#endif
197}
198#endif /* USE_WINTHREAD */
199
200#if USE_PTHREAD
201// TODO: can we throw exceptions instead of termination?
202inline void thread_monitor::check( int error_code, const char* routine ) {
203 if( error_code ) {
204 fprintf(stderr,"thread_monitor %s in %s\n", strerror(error_code), routine );
205 exit(1);
206 }
207}
208
209inline thread_monitor::handle_type thread_monitor::launch( void* (*thread_routine)(void*), void* arg, size_t stack_size ) {
210 // FIXME - consider more graceful recovery than just exiting if a thread cannot be launched.
211 // Note that there are some tricky situations to deal with, such that the thread is already
212 // grabbed as part of an OpenMP team.
213 pthread_attr_t s;
214 check(pthread_attr_init( &s ), "pthread_attr_init");
215 if( stack_size>0 )
216 check(pthread_attr_setstacksize( &s, stack_size ), "pthread_attr_setstack_size" );
217 pthread_t handle;
218 check( pthread_create( &handle, &s, thread_routine, arg ), "pthread_create" );
219 check( pthread_attr_destroy( &s ), "pthread_attr_destroy" );
220 return handle;
221}
222
223void thread_monitor::join(handle_type handle) {
224 check(pthread_join(handle, NULL), "pthread_join");
225}
226
227void thread_monitor::detach_thread(handle_type handle) {
228 check(pthread_detach(handle), "pthread_detach");
229}
230
231inline void thread_monitor::yield() {
232 sched_yield();
233}
234#endif /* USE_PTHREAD */
235
236inline void thread_monitor::notify() {
237 my_cookie.my_epoch = my_cookie.my_epoch + 1;
238 bool do_signal = in_wait.fetch_and_store( false );
239 if( do_signal )
240 my_sema.V();
241}
242
243inline void thread_monitor::prepare_wait( cookie& c ) {
244 if( skipped_wakeup ) {
245 // Lazily consume a signal that was skipped due to cancel_wait
246 skipped_wakeup = false;
247 my_sema.P(); // does not really wait on the semaphore
248 }
249 c = my_cookie;
250 in_wait.store<tbb::full_fence>( true );
251}
252
253inline void thread_monitor::commit_wait( cookie& c ) {
254 bool do_it = ( c.my_epoch == my_cookie.my_epoch );
255 if( do_it ) my_sema.P();
256 else cancel_wait();
257}
258
259inline void thread_monitor::cancel_wait() {
260 // if not in_wait, then some thread has sent us a signal;
261 // it will be consumed by the next prepare_wait call
262 skipped_wakeup = ! in_wait.fetch_and_store( false );
263}
264
265} // namespace internal
266} // namespace rml
267
268#endif /* __RML_thread_monitor_H */
269