1 | /* |
2 | Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #include "precompiled.hpp" |
31 | #include "macros.hpp" |
32 | #ifndef ZMQ_HAVE_WINDOWS |
33 | #include <unistd.h> |
34 | #endif |
35 | |
36 | #include <limits> |
37 | #include <climits> |
38 | #include <new> |
39 | #include <sstream> |
40 | #include <string.h> |
41 | |
42 | #include "ctx.hpp" |
43 | #include "socket_base.hpp" |
44 | #include "io_thread.hpp" |
45 | #include "reaper.hpp" |
46 | #include "pipe.hpp" |
47 | #include "err.hpp" |
48 | #include "msg.hpp" |
49 | #include "random.hpp" |
50 | |
51 | #ifdef ZMQ_HAVE_VMCI |
52 | #include <vmci_sockets.h> |
53 | #endif |
54 | |
55 | #ifdef ZMQ_USE_NSS |
56 | #include <nss.h> |
57 | #endif |
58 | |
59 | #ifdef ZMQ_USE_GNUTLS |
60 | #include <gnutls/gnutls.h> |
61 | #endif |
62 | |
63 | #define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe |
64 | #define ZMQ_CTX_TAG_VALUE_BAD 0xdeadbeef |
65 | |
66 | int clipped_maxsocket (int max_requested_) |
67 | { |
68 | if (max_requested_ >= zmq::poller_t::max_fds () |
69 | && zmq::poller_t::max_fds () != -1) |
70 | // -1 because we need room for the reaper mailbox. |
71 | max_requested_ = zmq::poller_t::max_fds () - 1; |
72 | |
73 | return max_requested_; |
74 | } |
75 | |
76 | zmq::ctx_t::ctx_t () : |
77 | _tag (ZMQ_CTX_TAG_VALUE_GOOD), |
78 | _starting (true), |
79 | _terminating (false), |
80 | _reaper (NULL), |
81 | _max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)), |
82 | _max_msgsz (INT_MAX), |
83 | _io_thread_count (ZMQ_IO_THREADS_DFLT), |
84 | _blocky (true), |
85 | _ipv6 (false), |
86 | _zero_copy (true) |
87 | { |
88 | #ifdef HAVE_FORK |
89 | _pid = getpid (); |
90 | #endif |
91 | #ifdef ZMQ_HAVE_VMCI |
92 | _vmci_fd = -1; |
93 | _vmci_family = -1; |
94 | #endif |
95 | |
96 | // Initialise crypto library, if needed. |
97 | zmq::random_open (); |
98 | |
99 | #ifdef ZMQ_USE_NSS |
100 | NSS_NoDB_Init (NULL); |
101 | #endif |
102 | |
103 | #ifdef ZMQ_USE_GNUTLS |
104 | gnutls_global_init (); |
105 | #endif |
106 | } |
107 | |
108 | bool zmq::ctx_t::check_tag () |
109 | { |
110 | return _tag == ZMQ_CTX_TAG_VALUE_GOOD; |
111 | } |
112 | |
113 | zmq::ctx_t::~ctx_t () |
114 | { |
115 | // Check that there are no remaining _sockets. |
116 | zmq_assert (_sockets.empty ()); |
117 | |
118 | // Ask I/O threads to terminate. If stop signal wasn't sent to I/O |
119 | // thread subsequent invocation of destructor would hang-up. |
120 | for (io_threads_t::size_type i = 0; i != _io_threads.size (); i++) { |
121 | _io_threads[i]->stop (); |
122 | } |
123 | |
124 | // Wait till I/O threads actually terminate. |
125 | for (io_threads_t::size_type i = 0; i != _io_threads.size (); i++) { |
126 | LIBZMQ_DELETE (_io_threads[i]); |
127 | } |
128 | |
129 | // Deallocate the reaper thread object. |
130 | LIBZMQ_DELETE (_reaper); |
131 | |
132 | // The mailboxes in _slots themselves were deallocated with their |
133 | // corresponding io_thread/socket objects. |
134 | |
135 | // De-initialise crypto library, if needed. |
136 | zmq::random_close (); |
137 | |
138 | #ifdef ZMQ_USE_NSS |
139 | NSS_Shutdown (); |
140 | #endif |
141 | |
142 | #ifdef ZMQ_USE_GNUTLS |
143 | gnutls_global_deinit (); |
144 | #endif |
145 | |
146 | // Remove the tag, so that the object is considered dead. |
147 | _tag = ZMQ_CTX_TAG_VALUE_BAD; |
148 | } |
149 | |
150 | bool zmq::ctx_t::valid () const |
151 | { |
152 | return _term_mailbox.valid (); |
153 | } |
154 | |
155 | int zmq::ctx_t::terminate () |
156 | { |
157 | _slot_sync.lock (); |
158 | |
159 | bool save_terminating = _terminating; |
160 | _terminating = false; |
161 | |
162 | // Connect up any pending inproc connections, otherwise we will hang |
163 | pending_connections_t copy = _pending_connections; |
164 | for (pending_connections_t::iterator p = copy.begin (), end = copy.end (); |
165 | p != end; ++p) { |
166 | zmq::socket_base_t *s = create_socket (ZMQ_PAIR); |
167 | // create_socket might fail eg: out of memory/sockets limit reached |
168 | zmq_assert (s); |
169 | s->bind (p->first.c_str ()); |
170 | s->close (); |
171 | } |
172 | _terminating = save_terminating; |
173 | |
174 | if (!_starting) { |
175 | #ifdef HAVE_FORK |
176 | if (_pid != getpid ()) { |
177 | // we are a forked child process. Close all file descriptors |
178 | // inherited from the parent. |
179 | for (sockets_t::size_type i = 0; i != _sockets.size (); i++) |
180 | _sockets[i]->get_mailbox ()->forked (); |
181 | |
182 | _term_mailbox.forked (); |
183 | } |
184 | #endif |
185 | |
186 | // Check whether termination was already underway, but interrupted and now |
187 | // restarted. |
188 | bool restarted = _terminating; |
189 | _terminating = true; |
190 | |
191 | // First attempt to terminate the context. |
192 | if (!restarted) { |
193 | // First send stop command to sockets so that any blocking calls |
194 | // can be interrupted. If there are no sockets we can ask reaper |
195 | // thread to stop. |
196 | for (sockets_t::size_type i = 0; i != _sockets.size (); i++) |
197 | _sockets[i]->stop (); |
198 | if (_sockets.empty ()) |
199 | _reaper->stop (); |
200 | } |
201 | _slot_sync.unlock (); |
202 | |
203 | // Wait till reaper thread closes all the sockets. |
204 | command_t cmd; |
205 | int rc = _term_mailbox.recv (&cmd, -1); |
206 | if (rc == -1 && errno == EINTR) |
207 | return -1; |
208 | errno_assert (rc == 0); |
209 | zmq_assert (cmd.type == command_t::done); |
210 | _slot_sync.lock (); |
211 | zmq_assert (_sockets.empty ()); |
212 | } |
213 | _slot_sync.unlock (); |
214 | |
215 | #ifdef ZMQ_HAVE_VMCI |
216 | _vmci_sync.lock (); |
217 | |
218 | VMCISock_ReleaseAFValueFd (_vmci_fd); |
219 | _vmci_family = -1; |
220 | _vmci_fd = -1; |
221 | |
222 | _vmci_sync.unlock (); |
223 | #endif |
224 | |
225 | // Deallocate the resources. |
226 | delete this; |
227 | |
228 | return 0; |
229 | } |
230 | |
231 | int zmq::ctx_t::shutdown () |
232 | { |
233 | scoped_lock_t locker (_slot_sync); |
234 | |
235 | if (!_starting && !_terminating) { |
236 | _terminating = true; |
237 | |
238 | // Send stop command to sockets so that any blocking calls |
239 | // can be interrupted. If there are no sockets we can ask reaper |
240 | // thread to stop. |
241 | for (sockets_t::size_type i = 0; i != _sockets.size (); i++) |
242 | _sockets[i]->stop (); |
243 | if (_sockets.empty ()) |
244 | _reaper->stop (); |
245 | } |
246 | |
247 | return 0; |
248 | } |
249 | |
250 | int zmq::ctx_t::set (int option_, const void *optval_, size_t optvallen_) |
251 | { |
252 | bool is_int = (optvallen_ == sizeof (int)); |
253 | int value = 0; |
254 | if (is_int) |
255 | memcpy (&value, optval_, sizeof (int)); |
256 | |
257 | switch (option_) { |
258 | case ZMQ_MAX_SOCKETS: |
259 | if (is_int && value >= 1 && value == clipped_maxsocket (value)) { |
260 | scoped_lock_t locker (_opt_sync); |
261 | _max_sockets = value; |
262 | return 0; |
263 | } |
264 | break; |
265 | |
266 | case ZMQ_IO_THREADS: |
267 | if (is_int && value >= 0) { |
268 | scoped_lock_t locker (_opt_sync); |
269 | _io_thread_count = value; |
270 | return 0; |
271 | } |
272 | break; |
273 | |
274 | case ZMQ_IPV6: |
275 | if (is_int && value >= 0) { |
276 | scoped_lock_t locker (_opt_sync); |
277 | _ipv6 = (value != 0); |
278 | return 0; |
279 | } |
280 | break; |
281 | |
282 | case ZMQ_BLOCKY: |
283 | if (is_int && value >= 0) { |
284 | scoped_lock_t locker (_opt_sync); |
285 | _blocky = (value != 0); |
286 | return 0; |
287 | } |
288 | break; |
289 | |
290 | case ZMQ_MAX_MSGSZ: |
291 | if (is_int && value >= 0) { |
292 | scoped_lock_t locker (_opt_sync); |
293 | _max_msgsz = value < INT_MAX ? value : INT_MAX; |
294 | return 0; |
295 | } |
296 | break; |
297 | |
298 | case ZMQ_ZERO_COPY_RECV: |
299 | if (is_int && value >= 0) { |
300 | scoped_lock_t locker (_opt_sync); |
301 | _zero_copy = (value != 0); |
302 | return 0; |
303 | } |
304 | break; |
305 | |
306 | default: { |
307 | return thread_ctx_t::set (option_, optval_, optvallen_); |
308 | } |
309 | } |
310 | |
311 | errno = EINVAL; |
312 | return -1; |
313 | } |
314 | |
315 | int zmq::ctx_t::get (int option_, void *optval_, size_t *optvallen_) |
316 | { |
317 | const bool is_int = (*optvallen_ == sizeof (int)); |
318 | int *value = static_cast<int *> (optval_); |
319 | |
320 | switch (option_) { |
321 | case ZMQ_MAX_SOCKETS: |
322 | if (is_int) { |
323 | *value = _max_sockets; |
324 | return 0; |
325 | } |
326 | break; |
327 | |
328 | case ZMQ_SOCKET_LIMIT: |
329 | if (is_int) { |
330 | *value = clipped_maxsocket (65535); |
331 | return 0; |
332 | } |
333 | break; |
334 | |
335 | case ZMQ_IO_THREADS: |
336 | if (is_int) { |
337 | *value = _io_thread_count; |
338 | return 0; |
339 | } |
340 | break; |
341 | |
342 | case ZMQ_IPV6: |
343 | if (is_int) { |
344 | *value = _ipv6; |
345 | return 0; |
346 | } |
347 | break; |
348 | |
349 | case ZMQ_BLOCKY: |
350 | if (is_int) { |
351 | *value = _blocky; |
352 | return 0; |
353 | } |
354 | break; |
355 | |
356 | case ZMQ_MAX_MSGSZ: |
357 | if (is_int) { |
358 | *value = _max_msgsz; |
359 | return 0; |
360 | } |
361 | break; |
362 | |
363 | case ZMQ_MSG_T_SIZE: |
364 | if (is_int) { |
365 | *value = sizeof (zmq_msg_t); |
366 | return 0; |
367 | } |
368 | break; |
369 | |
370 | case ZMQ_ZERO_COPY_RECV: |
371 | if (is_int) { |
372 | *value = _zero_copy; |
373 | return 0; |
374 | } |
375 | break; |
376 | |
377 | default: { |
378 | return thread_ctx_t::get (option_, optval_, optvallen_); |
379 | } |
380 | } |
381 | |
382 | errno = EINVAL; |
383 | return -1; |
384 | } |
385 | |
386 | int zmq::ctx_t::get (int option_) |
387 | { |
388 | int optval = 0; |
389 | size_t optvallen = sizeof (int); |
390 | |
391 | if (get (option_, &optval, &optvallen) == 0) |
392 | return optval; |
393 | |
394 | errno = EINVAL; |
395 | return -1; |
396 | } |
397 | |
398 | bool zmq::ctx_t::start () |
399 | { |
400 | // Initialise the array of mailboxes. Additional two slots are for |
401 | // zmq_ctx_term thread and reaper thread. |
402 | _opt_sync.lock (); |
403 | const int term_and_reaper_threads_count = 2; |
404 | const int mazmq = _max_sockets; |
405 | const int ios = _io_thread_count; |
406 | _opt_sync.unlock (); |
407 | int slot_count = mazmq + ios + term_and_reaper_threads_count; |
408 | try { |
409 | _slots.reserve (slot_count); |
410 | _empty_slots.reserve (slot_count - term_and_reaper_threads_count); |
411 | } |
412 | catch (const std::bad_alloc &) { |
413 | errno = ENOMEM; |
414 | return false; |
415 | } |
416 | _slots.resize (term_and_reaper_threads_count); |
417 | |
418 | // Initialise the infrastructure for zmq_ctx_term thread. |
419 | _slots[term_tid] = &_term_mailbox; |
420 | |
421 | // Create the reaper thread. |
422 | _reaper = new (std::nothrow) reaper_t (this, reaper_tid); |
423 | if (!_reaper) { |
424 | errno = ENOMEM; |
425 | goto fail_cleanup_slots; |
426 | } |
427 | if (!_reaper->get_mailbox ()->valid ()) |
428 | goto fail_cleanup_reaper; |
429 | _slots[reaper_tid] = _reaper->get_mailbox (); |
430 | _reaper->start (); |
431 | |
432 | // Create I/O thread objects and launch them. |
433 | _slots.resize (slot_count, NULL); |
434 | |
435 | for (int i = term_and_reaper_threads_count; |
436 | i != ios + term_and_reaper_threads_count; i++) { |
437 | io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); |
438 | if (!io_thread) { |
439 | errno = ENOMEM; |
440 | goto fail_cleanup_reaper; |
441 | } |
442 | if (!io_thread->get_mailbox ()->valid ()) { |
443 | delete io_thread; |
444 | goto fail_cleanup_reaper; |
445 | } |
446 | _io_threads.push_back (io_thread); |
447 | _slots[i] = io_thread->get_mailbox (); |
448 | io_thread->start (); |
449 | } |
450 | |
451 | // In the unused part of the slot array, create a list of empty slots. |
452 | for (int32_t i = static_cast<int32_t> (_slots.size ()) - 1; |
453 | i >= static_cast<int32_t> (ios) + term_and_reaper_threads_count; i--) { |
454 | _empty_slots.push_back (i); |
455 | } |
456 | |
457 | _starting = false; |
458 | return true; |
459 | |
460 | fail_cleanup_reaper: |
461 | _reaper->stop (); |
462 | delete _reaper; |
463 | _reaper = NULL; |
464 | |
465 | fail_cleanup_slots: |
466 | _slots.clear (); |
467 | return false; |
468 | } |
469 | |
470 | zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) |
471 | { |
472 | scoped_lock_t locker (_slot_sync); |
473 | |
474 | if (unlikely (_starting)) { |
475 | if (!start ()) |
476 | return NULL; |
477 | } |
478 | |
479 | // Once zmq_ctx_term() was called, we can't create new sockets. |
480 | if (_terminating) { |
481 | errno = ETERM; |
482 | return NULL; |
483 | } |
484 | |
485 | // If max_sockets limit was reached, return error. |
486 | if (_empty_slots.empty ()) { |
487 | errno = EMFILE; |
488 | return NULL; |
489 | } |
490 | |
491 | // Choose a slot for the socket. |
492 | uint32_t slot = _empty_slots.back (); |
493 | _empty_slots.pop_back (); |
494 | |
495 | // Generate new unique socket ID. |
496 | int sid = (static_cast<int> (max_socket_id.add (1))) + 1; |
497 | |
498 | // Create the socket and register its mailbox. |
499 | socket_base_t *s = socket_base_t::create (type_, this, slot, sid); |
500 | if (!s) { |
501 | _empty_slots.push_back (slot); |
502 | return NULL; |
503 | } |
504 | _sockets.push_back (s); |
505 | _slots[slot] = s->get_mailbox (); |
506 | |
507 | return s; |
508 | } |
509 | |
510 | void zmq::ctx_t::destroy_socket (class socket_base_t *socket_) |
511 | { |
512 | scoped_lock_t locker (_slot_sync); |
513 | |
514 | // Free the associated thread slot. |
515 | uint32_t tid = socket_->get_tid (); |
516 | _empty_slots.push_back (tid); |
517 | _slots[tid] = NULL; |
518 | |
519 | // Remove the socket from the list of sockets. |
520 | _sockets.erase (socket_); |
521 | |
522 | // If zmq_ctx_term() was already called and there are no more socket |
523 | // we can ask reaper thread to terminate. |
524 | if (_terminating && _sockets.empty ()) |
525 | _reaper->stop (); |
526 | } |
527 | |
528 | zmq::object_t *zmq::ctx_t::get_reaper () |
529 | { |
530 | return _reaper; |
531 | } |
532 | |
533 | zmq::thread_ctx_t::thread_ctx_t () : |
534 | _thread_priority (ZMQ_THREAD_PRIORITY_DFLT), |
535 | _thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT) |
536 | { |
537 | } |
538 | |
539 | void zmq::thread_ctx_t::start_thread (thread_t &thread_, |
540 | thread_fn *tfn_, |
541 | void *arg_, |
542 | const char *name_) const |
543 | { |
544 | thread_.setSchedulingParameters (_thread_priority, _thread_sched_policy, |
545 | _thread_affinity_cpus); |
546 | |
547 | char namebuf[16] = "" ; |
548 | snprintf (namebuf, sizeof (namebuf), "%s%sZMQbg%s%s" , |
549 | _thread_name_prefix.empty () ? "" : _thread_name_prefix.c_str (), |
550 | _thread_name_prefix.empty () ? "" : "/" , name_ ? "/" : "" , |
551 | name_ ? name_ : "" ); |
552 | thread_.start (tfn_, arg_, namebuf); |
553 | } |
554 | |
555 | int zmq::thread_ctx_t::set (int option_, const void *optval_, size_t optvallen_) |
556 | { |
557 | bool is_int = (optvallen_ == sizeof (int)); |
558 | int value = 0; |
559 | if (is_int) |
560 | memcpy (&value, optval_, sizeof (int)); |
561 | |
562 | switch (option_) { |
563 | case ZMQ_THREAD_SCHED_POLICY: |
564 | if (is_int && value >= 0) { |
565 | scoped_lock_t locker (_opt_sync); |
566 | _thread_sched_policy = value; |
567 | return 0; |
568 | } |
569 | break; |
570 | |
571 | case ZMQ_THREAD_AFFINITY_CPU_ADD: |
572 | if (is_int && value >= 0) { |
573 | scoped_lock_t locker (_opt_sync); |
574 | _thread_affinity_cpus.insert (value); |
575 | return 0; |
576 | } |
577 | break; |
578 | |
579 | case ZMQ_THREAD_AFFINITY_CPU_REMOVE: |
580 | if (is_int && value >= 0) { |
581 | scoped_lock_t locker (_opt_sync); |
582 | if (0 == _thread_affinity_cpus.erase (value)) { |
583 | errno = EINVAL; |
584 | return -1; |
585 | } |
586 | return 0; |
587 | } |
588 | break; |
589 | |
590 | case ZMQ_THREAD_PRIORITY: |
591 | if (is_int && value >= 0) { |
592 | scoped_lock_t locker (_opt_sync); |
593 | _thread_priority = value; |
594 | return 0; |
595 | } |
596 | break; |
597 | |
598 | case ZMQ_THREAD_NAME_PREFIX: |
599 | // start_thread() allows max 16 chars for thread name |
600 | if (is_int) { |
601 | std::ostringstream s; |
602 | s << value; |
603 | scoped_lock_t locker (_opt_sync); |
604 | _thread_name_prefix = s.str (); |
605 | return 0; |
606 | } else if (optvallen_ > 0 && optvallen_ <= 16) { |
607 | scoped_lock_t locker (_opt_sync); |
608 | _thread_name_prefix.assign (static_cast<const char *> (optval_), |
609 | optvallen_); |
610 | return 0; |
611 | } |
612 | break; |
613 | } |
614 | |
615 | errno = EINVAL; |
616 | return -1; |
617 | } |
618 | |
619 | int zmq::thread_ctx_t::get (int option_, |
620 | void *optval_, |
621 | const size_t *optvallen_) |
622 | { |
623 | const bool is_int = (*optvallen_ == sizeof (int)); |
624 | int *value = static_cast<int *> (optval_); |
625 | |
626 | switch (option_) { |
627 | case ZMQ_THREAD_SCHED_POLICY: |
628 | if (is_int) { |
629 | scoped_lock_t locker (_opt_sync); |
630 | *value = _thread_sched_policy; |
631 | return 0; |
632 | } |
633 | break; |
634 | |
635 | case ZMQ_THREAD_NAME_PREFIX: |
636 | if (is_int) { |
637 | scoped_lock_t locker (_opt_sync); |
638 | *value = atoi (_thread_name_prefix.c_str ()); |
639 | return 0; |
640 | } else if (*optvallen_ >= _thread_name_prefix.size ()) { |
641 | scoped_lock_t locker (_opt_sync); |
642 | memcpy (optval_, _thread_name_prefix.data (), |
643 | _thread_name_prefix.size ()); |
644 | return 0; |
645 | } |
646 | break; |
647 | } |
648 | |
649 | errno = EINVAL; |
650 | return -1; |
651 | } |
652 | |
653 | void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_) |
654 | { |
655 | _slots[tid_]->send (command_); |
656 | } |
657 | |
658 | zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_) |
659 | { |
660 | if (_io_threads.empty ()) |
661 | return NULL; |
662 | |
663 | // Find the I/O thread with minimum load. |
664 | int min_load = -1; |
665 | io_thread_t *selected_io_thread = NULL; |
666 | for (io_threads_t::size_type i = 0; i != _io_threads.size (); i++) { |
667 | if (!affinity_ || (affinity_ & (uint64_t (1) << i))) { |
668 | int load = _io_threads[i]->get_load (); |
669 | if (selected_io_thread == NULL || load < min_load) { |
670 | min_load = load; |
671 | selected_io_thread = _io_threads[i]; |
672 | } |
673 | } |
674 | } |
675 | return selected_io_thread; |
676 | } |
677 | |
678 | int zmq::ctx_t::register_endpoint (const char *addr_, |
679 | const endpoint_t &endpoint_) |
680 | { |
681 | scoped_lock_t locker (_endpoints_sync); |
682 | |
683 | const bool inserted = |
684 | _endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (addr_), endpoint_) |
685 | .second; |
686 | if (!inserted) { |
687 | errno = EADDRINUSE; |
688 | return -1; |
689 | } |
690 | return 0; |
691 | } |
692 | |
693 | int zmq::ctx_t::unregister_endpoint (const std::string &addr_, |
694 | socket_base_t *socket_) |
695 | { |
696 | scoped_lock_t locker (_endpoints_sync); |
697 | |
698 | const endpoints_t::iterator it = _endpoints.find (addr_); |
699 | if (it == _endpoints.end () || it->second.socket != socket_) { |
700 | errno = ENOENT; |
701 | return -1; |
702 | } |
703 | |
704 | // Remove endpoint. |
705 | _endpoints.erase (it); |
706 | |
707 | return 0; |
708 | } |
709 | |
710 | void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_) |
711 | { |
712 | scoped_lock_t locker (_endpoints_sync); |
713 | |
714 | for (endpoints_t::iterator it = _endpoints.begin (), |
715 | end = _endpoints.end (); |
716 | it != end;) { |
717 | if (it->second.socket == socket_) |
718 | #if __cplusplus >= 201103L |
719 | it = _endpoints.erase (it); |
720 | #else |
721 | _endpoints.erase (it++); |
722 | #endif |
723 | else |
724 | ++it; |
725 | } |
726 | } |
727 | |
728 | zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_) |
729 | { |
730 | scoped_lock_t locker (_endpoints_sync); |
731 | |
732 | endpoints_t::iterator it = _endpoints.find (addr_); |
733 | if (it == _endpoints.end ()) { |
734 | errno = ECONNREFUSED; |
735 | endpoint_t empty = {NULL, options_t ()}; |
736 | return empty; |
737 | } |
738 | endpoint_t endpoint = it->second; |
739 | |
740 | // Increment the command sequence number of the peer so that it won't |
741 | // get deallocated until "bind" command is issued by the caller. |
742 | // The subsequent 'bind' has to be called with inc_seqnum parameter |
743 | // set to false, so that the seqnum isn't incremented twice. |
744 | endpoint.socket->inc_seqnum (); |
745 | |
746 | return endpoint; |
747 | } |
748 | |
749 | void zmq::ctx_t::pend_connection (const std::string &addr_, |
750 | const endpoint_t &endpoint_, |
751 | pipe_t **pipes_) |
752 | { |
753 | scoped_lock_t locker (_endpoints_sync); |
754 | |
755 | const pending_connection_t pending_connection = {endpoint_, pipes_[0], |
756 | pipes_[1]}; |
757 | |
758 | endpoints_t::iterator it = _endpoints.find (addr_); |
759 | if (it == _endpoints.end ()) { |
760 | // Still no bind. |
761 | endpoint_.socket->inc_seqnum (); |
762 | _pending_connections.ZMQ_MAP_INSERT_OR_EMPLACE (addr_, |
763 | pending_connection); |
764 | } else { |
765 | // Bind has happened in the mean time, connect directly |
766 | connect_inproc_sockets (it->second.socket, it->second.options, |
767 | pending_connection, connect_side); |
768 | } |
769 | } |
770 | |
771 | void zmq::ctx_t::connect_pending (const char *addr_, |
772 | zmq::socket_base_t *bind_socket_) |
773 | { |
774 | scoped_lock_t locker (_endpoints_sync); |
775 | |
776 | std::pair<pending_connections_t::iterator, pending_connections_t::iterator> |
777 | pending = _pending_connections.equal_range (addr_); |
778 | for (pending_connections_t::iterator p = pending.first; p != pending.second; |
779 | ++p) |
780 | connect_inproc_sockets (bind_socket_, _endpoints[addr_].options, |
781 | p->second, bind_side); |
782 | |
783 | _pending_connections.erase (pending.first, pending.second); |
784 | } |
785 | |
786 | void zmq::ctx_t::connect_inproc_sockets ( |
787 | zmq::socket_base_t *bind_socket_, |
788 | options_t &bind_options_, |
789 | const pending_connection_t &pending_connection_, |
790 | side side_) |
791 | { |
792 | bind_socket_->inc_seqnum (); |
793 | pending_connection_.bind_pipe->set_tid (bind_socket_->get_tid ()); |
794 | |
795 | if (!bind_options_.recv_routing_id) { |
796 | msg_t msg; |
797 | const bool ok = pending_connection_.bind_pipe->read (&msg); |
798 | zmq_assert (ok); |
799 | const int rc = msg.close (); |
800 | errno_assert (rc == 0); |
801 | } |
802 | |
803 | if (!get_effective_conflate_option (pending_connection_.endpoint.options)) { |
804 | pending_connection_.connect_pipe->set_hwms_boost (bind_options_.sndhwm, |
805 | bind_options_.rcvhwm); |
806 | pending_connection_.bind_pipe->set_hwms_boost ( |
807 | pending_connection_.endpoint.options.sndhwm, |
808 | pending_connection_.endpoint.options.rcvhwm); |
809 | |
810 | pending_connection_.connect_pipe->set_hwms ( |
811 | pending_connection_.endpoint.options.rcvhwm, |
812 | pending_connection_.endpoint.options.sndhwm); |
813 | pending_connection_.bind_pipe->set_hwms (bind_options_.rcvhwm, |
814 | bind_options_.sndhwm); |
815 | } else { |
816 | pending_connection_.connect_pipe->set_hwms (-1, -1); |
817 | pending_connection_.bind_pipe->set_hwms (-1, -1); |
818 | } |
819 | |
820 | if (side_ == bind_side) { |
821 | command_t cmd; |
822 | cmd.type = command_t::bind; |
823 | cmd.args.bind.pipe = pending_connection_.bind_pipe; |
824 | bind_socket_->process_command (cmd); |
825 | bind_socket_->send_inproc_connected ( |
826 | pending_connection_.endpoint.socket); |
827 | } else |
828 | pending_connection_.connect_pipe->send_bind ( |
829 | bind_socket_, pending_connection_.bind_pipe, false); |
830 | |
831 | // When a ctx is terminated all pending inproc connection will be |
832 | // connected, but the socket will already be closed and the pipe will be |
833 | // in waiting_for_delimiter state, which means no more writes can be done |
834 | // and the routing id write fails and causes an assert. Check if the socket |
835 | // is open before sending. |
836 | if (pending_connection_.endpoint.options.recv_routing_id |
837 | && pending_connection_.endpoint.socket->check_tag ()) { |
838 | send_routing_id (pending_connection_.bind_pipe, bind_options_); |
839 | } |
840 | } |
841 | |
842 | #ifdef ZMQ_HAVE_VMCI |
843 | |
844 | int zmq::ctx_t::get_vmci_socket_family () |
845 | { |
846 | zmq::scoped_lock_t locker (_vmci_sync); |
847 | |
848 | if (_vmci_fd == -1) { |
849 | _vmci_family = VMCISock_GetAFValueFd (&_vmci_fd); |
850 | |
851 | if (_vmci_fd != -1) { |
852 | #ifdef FD_CLOEXEC |
853 | int rc = fcntl (_vmci_fd, F_SETFD, FD_CLOEXEC); |
854 | errno_assert (rc != -1); |
855 | #endif |
856 | } |
857 | } |
858 | |
859 | return _vmci_family; |
860 | } |
861 | |
862 | #endif |
863 | |
864 | // The last used socket ID, or 0 if no socket was used so far. Note that this |
865 | // is a global variable. Thus, even sockets created in different contexts have |
866 | // unique IDs. |
867 | zmq::atomic_counter_t zmq::ctx_t::max_socket_id; |
868 | |