1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include "macros.hpp"
32#include "thread.hpp"
33#include "err.hpp"
34
35#ifdef ZMQ_HAVE_WINDOWS
36#include <winnt.h>
37#endif
38
39bool zmq::thread_t::get_started () const
40{
41 return _started;
42}
43
44#ifdef ZMQ_HAVE_WINDOWS
45
46extern "C" {
47#if defined _WIN32_WCE
48static DWORD thread_routine (LPVOID arg_)
49#else
50static unsigned int __stdcall thread_routine (void *arg_)
51#endif
52{
53 zmq::thread_t *self = (zmq::thread_t *) arg_;
54 self->applyThreadName ();
55 self->_tfn (self->_arg);
56 return 0;
57}
58}
59
60void zmq::thread_t::start (thread_fn *tfn_, void *arg_, const char *name_)
61{
62 _tfn = tfn_;
63 _arg = arg_;
64 if (name_)
65 strncpy (_name, name_, sizeof (_name) - 1);
66#if defined _WIN32_WCE
67 _descriptor =
68 (HANDLE) CreateThread (NULL, 0, &::thread_routine, this, 0, &_thread_id);
69#else
70 _descriptor = (HANDLE) _beginthreadex (NULL, 0, &::thread_routine, this, 0,
71 &_thread_id);
72#endif
73 win_assert (_descriptor != NULL);
74 _started = true;
75}
76
77bool zmq::thread_t::is_current_thread () const
78{
79 return GetCurrentThreadId () == _thread_id;
80}
81
82void zmq::thread_t::stop ()
83{
84 if (_started) {
85 DWORD rc = WaitForSingleObject (_descriptor, INFINITE);
86 win_assert (rc != WAIT_FAILED);
87 BOOL rc2 = CloseHandle (_descriptor);
88 win_assert (rc2 != 0);
89 }
90}
91
92void zmq::thread_t::setSchedulingParameters (
93 int priority_, int scheduling_policy_, const std::set<int> &affinity_cpus_)
94{
95 // not implemented
96 LIBZMQ_UNUSED (priority_);
97 LIBZMQ_UNUSED (scheduling_policy_);
98 LIBZMQ_UNUSED (affinity_cpus_);
99}
100
101void zmq::thread_t::
102 applySchedulingParameters () // to be called in secondary thread context
103{
104 // not implemented
105}
106
107namespace
108{
109#pragma pack(push, 8)
110struct thread_info_t
111{
112 DWORD _type;
113 LPCSTR _name;
114 DWORD _thread_id;
115 DWORD _flags;
116};
117#pragma pack(pop)
118}
119
120struct MY_EXCEPTION_REGISTRATION_RECORD
121{
122 typedef EXCEPTION_DISPOSITION (NTAPI *HandlerFunctionType) (
123 EXCEPTION_RECORD *, void *, CONTEXT *, void *);
124
125 MY_EXCEPTION_REGISTRATION_RECORD *Next;
126 HandlerFunctionType Handler;
127};
128
129static EXCEPTION_DISPOSITION NTAPI continue_execution (EXCEPTION_RECORD *rec,
130 void *frame,
131 CONTEXT *ctx,
132 void *disp)
133{
134 return ExceptionContinueExecution;
135}
136
137void zmq::thread_t::
138 applyThreadName () // to be called in secondary thread context
139{
140 if (!_name[0] || !IsDebuggerPresent ())
141 return;
142
143 thread_info_t thread_info;
144 thread_info._type = 0x1000;
145 thread_info._name = _name;
146 thread_info._thread_id = -1;
147 thread_info._flags = 0;
148
149 NT_TIB *tib = ((NT_TIB *) NtCurrentTeb ());
150
151 MY_EXCEPTION_REGISTRATION_RECORD rec;
152 rec.Next = (MY_EXCEPTION_REGISTRATION_RECORD *) tib->ExceptionList;
153 rec.Handler = continue_execution;
154
155 // push our handler, raise, and finally pop our handler
156 tib->ExceptionList = (_EXCEPTION_REGISTRATION_RECORD *) &rec;
157 DWORD MS_VC_EXCEPTION = 0x406D1388;
158 RaiseException (MS_VC_EXCEPTION, 0,
159 sizeof (thread_info) / sizeof (ULONG_PTR),
160 (ULONG_PTR *) &thread_info);
161 tib->ExceptionList =
162 (_EXCEPTION_REGISTRATION_RECORD
163 *) (((MY_EXCEPTION_REGISTRATION_RECORD *) tib->ExceptionList)->Next);
164}
165
166#elif defined ZMQ_HAVE_VXWORKS
167
168extern "C" {
169static void *thread_routine (void *arg_)
170{
171 zmq::thread_t *self = (zmq::thread_t *) arg_;
172 self->applySchedulingParameters ();
173 self->_tfn (self->_arg);
174 return NULL;
175}
176}
177
178void zmq::thread_t::start (thread_fn *tfn_, void *arg_, const char *name_)
179{
180 LIBZMQ_UNUSED (name_);
181 _tfn = tfn_;
182 _arg = arg_;
183 _descriptor = taskSpawn (NULL, DEFAULT_PRIORITY, DEFAULT_OPTIONS,
184 DEFAULT_STACK_SIZE, (FUNCPTR) thread_routine,
185 (int) this, 0, 0, 0, 0, 0, 0, 0, 0, 0);
186 if (_descriptor != NULL || _descriptor > 0)
187 _started = true;
188}
189
190void zmq::thread_t::stop ()
191{
192 if (_started)
193 while ((_descriptor != NULL || _descriptor > 0)
194 && taskIdVerify (_descriptor) == 0) {
195 }
196}
197
198bool zmq::thread_t::is_current_thread () const
199{
200 return taskIdSelf () == _descriptor;
201}
202
203void zmq::thread_t::setSchedulingParameters (
204 int priority_, int schedulingPolicy_, const std::set<int> &affinity_cpus_)
205{
206 _thread_priority = priority_;
207 _thread_sched_policy = schedulingPolicy_;
208 _thread_affinity_cpus = affinity_cpus_;
209}
210
211void zmq::thread_t::
212 applySchedulingParameters () // to be called in secondary thread context
213{
214 int priority =
215 (_thread_priority >= 0 ? _thread_priority : DEFAULT_PRIORITY);
216 priority = (priority < UCHAR_MAX ? priority : DEFAULT_PRIORITY);
217 if (_descriptor != NULL || _descriptor > 0) {
218 taskPrioritySet (_descriptor, priority);
219 }
220}
221
222void zmq::thread_t::
223 applyThreadName () // to be called in secondary thread context
224{
225 // not implemented
226}
227
228#else
229
230#include <signal.h>
231#include <unistd.h>
232#include <sys/time.h>
233#include <sys/resource.h>
234
235extern "C" {
236static void *thread_routine (void *arg_)
237{
238#if !defined ZMQ_HAVE_OPENVMS && !defined ZMQ_HAVE_ANDROID
239 // Following code will guarantee more predictable latencies as it'll
240 // disallow any signal handling in the I/O thread.
241 sigset_t signal_set;
242 int rc = sigfillset (&signal_set);
243 errno_assert (rc == 0);
244 rc = pthread_sigmask (SIG_BLOCK, &signal_set, NULL);
245 posix_assert (rc);
246#endif
247 zmq::thread_t *self = (zmq::thread_t *) arg_;
248 self->applySchedulingParameters ();
249 self->applyThreadName ();
250 self->_tfn (self->_arg);
251 return NULL;
252}
253}
254
255void zmq::thread_t::start (thread_fn *tfn_, void *arg_, const char *name_)
256{
257 _tfn = tfn_;
258 _arg = arg_;
259 if (name_)
260 strncpy (_name, name_, sizeof (_name) - 1);
261 int rc = pthread_create (&_descriptor, NULL, thread_routine, this);
262 posix_assert (rc);
263 _started = true;
264}
265
266void zmq::thread_t::stop ()
267{
268 if (_started) {
269 int rc = pthread_join (_descriptor, NULL);
270 posix_assert (rc);
271 }
272}
273
274bool zmq::thread_t::is_current_thread () const
275{
276 return bool(pthread_equal (pthread_self (), _descriptor));
277}
278
279void zmq::thread_t::setSchedulingParameters (
280 int priority_, int scheduling_policy_, const std::set<int> &affinity_cpus_)
281{
282 _thread_priority = priority_;
283 _thread_sched_policy = scheduling_policy_;
284 _thread_affinity_cpus = affinity_cpus_;
285}
286
287void zmq::thread_t::
288 applySchedulingParameters () // to be called in secondary thread context
289{
290#if defined _POSIX_THREAD_PRIORITY_SCHEDULING \
291 && _POSIX_THREAD_PRIORITY_SCHEDULING >= 0
292 int policy = 0;
293 struct sched_param param;
294
295#if _POSIX_THREAD_PRIORITY_SCHEDULING == 0 \
296 && defined _SC_THREAD_PRIORITY_SCHEDULING
297 if (sysconf (_SC_THREAD_PRIORITY_SCHEDULING) < 0) {
298 return;
299 }
300#endif
301 int rc = pthread_getschedparam (pthread_self (), &policy, &param);
302 posix_assert (rc);
303
304 if (_thread_sched_policy != ZMQ_THREAD_SCHED_POLICY_DFLT) {
305 policy = _thread_sched_policy;
306 }
307
308 /* Quoting docs:
309 "Linux allows the static priority range 1 to 99 for the SCHED_FIFO and
310 SCHED_RR policies, and the priority 0 for the remaining policies."
311 Other policies may use the "nice value" in place of the priority:
312 */
313 bool use_nice_instead_priority =
314 (policy != SCHED_FIFO) && (policy != SCHED_RR);
315
316 if (_thread_priority != ZMQ_THREAD_PRIORITY_DFLT) {
317 if (use_nice_instead_priority)
318 param.sched_priority =
319 0; // this is the only supported priority for most scheduling policies
320 else
321 param.sched_priority =
322 _thread_priority; // user should provide a value between 1 and 99
323 }
324
325#ifdef __NetBSD__
326 if (policy == SCHED_OTHER)
327 param.sched_priority = -1;
328#endif
329
330 rc = pthread_setschedparam (pthread_self (), policy, &param);
331
332#if defined(__FreeBSD_kernel__) || defined(__FreeBSD__)
333 // If this feature is unavailable at run-time, don't abort.
334 if (rc == ENOSYS)
335 return;
336#endif
337
338 posix_assert (rc);
339
340#if !defined ZMQ_HAVE_VXWORKS
341 if (use_nice_instead_priority
342 && _thread_priority != ZMQ_THREAD_PRIORITY_DFLT) {
343 // assume the user wants to decrease the thread's nice value
344 // i.e., increase the chance of this thread being scheduled: try setting that to
345 // maximum priority.
346 rc = nice (-20);
347
348 errno_assert (rc != -1);
349 // IMPORTANT: EPERM is typically returned for unprivileged processes: that's because
350 // CAP_SYS_NICE capability is required or RLIMIT_NICE resource limit should be changed to avoid EPERM!
351 }
352#endif
353
354#ifdef ZMQ_HAVE_PTHREAD_SET_AFFINITY
355 if (!_thread_affinity_cpus.empty ()) {
356 cpu_set_t cpuset;
357 CPU_ZERO (&cpuset);
358 for (std::set<int>::const_iterator it = _thread_affinity_cpus.begin ();
359 it != _thread_affinity_cpus.end (); it++) {
360 CPU_SET ((int) (*it), &cpuset);
361 }
362 rc =
363 pthread_setaffinity_np (pthread_self (), sizeof (cpu_set_t), &cpuset);
364 posix_assert (rc);
365 }
366#endif
367#endif
368}
369
370void zmq::thread_t::
371 applyThreadName () // to be called in secondary thread context
372{
373 /* The thread name is a cosmetic string, added to ease debugging of
374 * multi-threaded applications. It is not a big issue if this value
375 * can not be set for any reason (such as Permission denied in some
376 * cases where the application changes its EUID, etc.) The value of
377 * "int rc" is retained where available, to help debuggers stepping
378 * through code to see its value - but otherwise it is ignored.
379 */
380 if (!_name[0])
381 return;
382
383 /* Fails with permission denied on Android 5/6 */
384#if defined(ZMQ_HAVE_ANDROID)
385 return;
386#endif
387
388#if defined(ZMQ_HAVE_PTHREAD_SETNAME_1)
389 int rc = pthread_setname_np (_name);
390 if (rc)
391 return;
392#elif defined(ZMQ_HAVE_PTHREAD_SETNAME_2)
393 int rc = pthread_setname_np (pthread_self (), _name);
394 if (rc)
395 return;
396#elif defined(ZMQ_HAVE_PTHREAD_SETNAME_3)
397 int rc = pthread_setname_np (pthread_self (), _name, NULL);
398 if (rc)
399 return;
400#elif defined(ZMQ_HAVE_PTHREAD_SET_NAME)
401 pthread_set_name_np (pthread_self (), _name);
402#endif
403}
404
405#endif
406