1/*
2 Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include "macros.hpp"
32#ifndef ZMQ_HAVE_WINDOWS
33#include <unistd.h>
34#endif
35
36#include <limits>
37#include <climits>
38#include <new>
39#include <sstream>
40#include <string.h>
41
42#include "ctx.hpp"
43#include "socket_base.hpp"
44#include "io_thread.hpp"
45#include "reaper.hpp"
46#include "pipe.hpp"
47#include "err.hpp"
48#include "msg.hpp"
49#include "random.hpp"
50
51#ifdef ZMQ_HAVE_VMCI
52#include <vmci_sockets.h>
53#endif
54
55#ifdef ZMQ_USE_NSS
56#include <nss.h>
57#endif
58
59#ifdef ZMQ_USE_GNUTLS
60#include <gnutls/gnutls.h>
61#endif
62
63#define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe
64#define ZMQ_CTX_TAG_VALUE_BAD 0xdeadbeef
65
66int clipped_maxsocket (int max_requested_)
67{
68 if (max_requested_ >= zmq::poller_t::max_fds ()
69 && zmq::poller_t::max_fds () != -1)
70 // -1 because we need room for the reaper mailbox.
71 max_requested_ = zmq::poller_t::max_fds () - 1;
72
73 return max_requested_;
74}
75
76zmq::ctx_t::ctx_t () :
77 _tag (ZMQ_CTX_TAG_VALUE_GOOD),
78 _starting (true),
79 _terminating (false),
80 _reaper (NULL),
81 _max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
82 _max_msgsz (INT_MAX),
83 _io_thread_count (ZMQ_IO_THREADS_DFLT),
84 _blocky (true),
85 _ipv6 (false),
86 _zero_copy (true)
87{
88#ifdef HAVE_FORK
89 _pid = getpid ();
90#endif
91#ifdef ZMQ_HAVE_VMCI
92 _vmci_fd = -1;
93 _vmci_family = -1;
94#endif
95
96 // Initialise crypto library, if needed.
97 zmq::random_open ();
98
99#ifdef ZMQ_USE_NSS
100 NSS_NoDB_Init (NULL);
101#endif
102
103#ifdef ZMQ_USE_GNUTLS
104 gnutls_global_init ();
105#endif
106}
107
108bool zmq::ctx_t::check_tag ()
109{
110 return _tag == ZMQ_CTX_TAG_VALUE_GOOD;
111}
112
113zmq::ctx_t::~ctx_t ()
114{
115 // Check that there are no remaining _sockets.
116 zmq_assert (_sockets.empty ());
117
118 // Ask I/O threads to terminate. If stop signal wasn't sent to I/O
119 // thread subsequent invocation of destructor would hang-up.
120 for (io_threads_t::size_type i = 0; i != _io_threads.size (); i++) {
121 _io_threads[i]->stop ();
122 }
123
124 // Wait till I/O threads actually terminate.
125 for (io_threads_t::size_type i = 0; i != _io_threads.size (); i++) {
126 LIBZMQ_DELETE (_io_threads[i]);
127 }
128
129 // Deallocate the reaper thread object.
130 LIBZMQ_DELETE (_reaper);
131
132 // The mailboxes in _slots themselves were deallocated with their
133 // corresponding io_thread/socket objects.
134
135 // De-initialise crypto library, if needed.
136 zmq::random_close ();
137
138#ifdef ZMQ_USE_NSS
139 NSS_Shutdown ();
140#endif
141
142#ifdef ZMQ_USE_GNUTLS
143 gnutls_global_deinit ();
144#endif
145
146 // Remove the tag, so that the object is considered dead.
147 _tag = ZMQ_CTX_TAG_VALUE_BAD;
148}
149
150bool zmq::ctx_t::valid () const
151{
152 return _term_mailbox.valid ();
153}
154
155int zmq::ctx_t::terminate ()
156{
157 _slot_sync.lock ();
158
159 bool save_terminating = _terminating;
160 _terminating = false;
161
162 // Connect up any pending inproc connections, otherwise we will hang
163 pending_connections_t copy = _pending_connections;
164 for (pending_connections_t::iterator p = copy.begin (), end = copy.end ();
165 p != end; ++p) {
166 zmq::socket_base_t *s = create_socket (ZMQ_PAIR);
167 // create_socket might fail eg: out of memory/sockets limit reached
168 zmq_assert (s);
169 s->bind (p->first.c_str ());
170 s->close ();
171 }
172 _terminating = save_terminating;
173
174 if (!_starting) {
175#ifdef HAVE_FORK
176 if (_pid != getpid ()) {
177 // we are a forked child process. Close all file descriptors
178 // inherited from the parent.
179 for (sockets_t::size_type i = 0; i != _sockets.size (); i++)
180 _sockets[i]->get_mailbox ()->forked ();
181
182 _term_mailbox.forked ();
183 }
184#endif
185
186 // Check whether termination was already underway, but interrupted and now
187 // restarted.
188 bool restarted = _terminating;
189 _terminating = true;
190
191 // First attempt to terminate the context.
192 if (!restarted) {
193 // First send stop command to sockets so that any blocking calls
194 // can be interrupted. If there are no sockets we can ask reaper
195 // thread to stop.
196 for (sockets_t::size_type i = 0; i != _sockets.size (); i++)
197 _sockets[i]->stop ();
198 if (_sockets.empty ())
199 _reaper->stop ();
200 }
201 _slot_sync.unlock ();
202
203 // Wait till reaper thread closes all the sockets.
204 command_t cmd;
205 int rc = _term_mailbox.recv (&cmd, -1);
206 if (rc == -1 && errno == EINTR)
207 return -1;
208 errno_assert (rc == 0);
209 zmq_assert (cmd.type == command_t::done);
210 _slot_sync.lock ();
211 zmq_assert (_sockets.empty ());
212 }
213 _slot_sync.unlock ();
214
215#ifdef ZMQ_HAVE_VMCI
216 _vmci_sync.lock ();
217
218 VMCISock_ReleaseAFValueFd (_vmci_fd);
219 _vmci_family = -1;
220 _vmci_fd = -1;
221
222 _vmci_sync.unlock ();
223#endif
224
225 // Deallocate the resources.
226 delete this;
227
228 return 0;
229}
230
231int zmq::ctx_t::shutdown ()
232{
233 scoped_lock_t locker (_slot_sync);
234
235 if (!_starting && !_terminating) {
236 _terminating = true;
237
238 // Send stop command to sockets so that any blocking calls
239 // can be interrupted. If there are no sockets we can ask reaper
240 // thread to stop.
241 for (sockets_t::size_type i = 0; i != _sockets.size (); i++)
242 _sockets[i]->stop ();
243 if (_sockets.empty ())
244 _reaper->stop ();
245 }
246
247 return 0;
248}
249
250int zmq::ctx_t::set (int option_, const void *optval_, size_t optvallen_)
251{
252 bool is_int = (optvallen_ == sizeof (int));
253 int value = 0;
254 if (is_int)
255 memcpy (&value, optval_, sizeof (int));
256
257 switch (option_) {
258 case ZMQ_MAX_SOCKETS:
259 if (is_int && value >= 1 && value == clipped_maxsocket (value)) {
260 scoped_lock_t locker (_opt_sync);
261 _max_sockets = value;
262 return 0;
263 }
264 break;
265
266 case ZMQ_IO_THREADS:
267 if (is_int && value >= 0) {
268 scoped_lock_t locker (_opt_sync);
269 _io_thread_count = value;
270 return 0;
271 }
272 break;
273
274 case ZMQ_IPV6:
275 if (is_int && value >= 0) {
276 scoped_lock_t locker (_opt_sync);
277 _ipv6 = (value != 0);
278 return 0;
279 }
280 break;
281
282 case ZMQ_BLOCKY:
283 if (is_int && value >= 0) {
284 scoped_lock_t locker (_opt_sync);
285 _blocky = (value != 0);
286 return 0;
287 }
288 break;
289
290 case ZMQ_MAX_MSGSZ:
291 if (is_int && value >= 0) {
292 scoped_lock_t locker (_opt_sync);
293 _max_msgsz = value < INT_MAX ? value : INT_MAX;
294 return 0;
295 }
296 break;
297
298 case ZMQ_ZERO_COPY_RECV:
299 if (is_int && value >= 0) {
300 scoped_lock_t locker (_opt_sync);
301 _zero_copy = (value != 0);
302 return 0;
303 }
304 break;
305
306 default: {
307 return thread_ctx_t::set (option_, optval_, optvallen_);
308 }
309 }
310
311 errno = EINVAL;
312 return -1;
313}
314
315int zmq::ctx_t::get (int option_, void *optval_, size_t *optvallen_)
316{
317 const bool is_int = (*optvallen_ == sizeof (int));
318 int *value = static_cast<int *> (optval_);
319
320 switch (option_) {
321 case ZMQ_MAX_SOCKETS:
322 if (is_int) {
323 *value = _max_sockets;
324 return 0;
325 }
326 break;
327
328 case ZMQ_SOCKET_LIMIT:
329 if (is_int) {
330 *value = clipped_maxsocket (65535);
331 return 0;
332 }
333 break;
334
335 case ZMQ_IO_THREADS:
336 if (is_int) {
337 *value = _io_thread_count;
338 return 0;
339 }
340 break;
341
342 case ZMQ_IPV6:
343 if (is_int) {
344 *value = _ipv6;
345 return 0;
346 }
347 break;
348
349 case ZMQ_BLOCKY:
350 if (is_int) {
351 *value = _blocky;
352 return 0;
353 }
354 break;
355
356 case ZMQ_MAX_MSGSZ:
357 if (is_int) {
358 *value = _max_msgsz;
359 return 0;
360 }
361 break;
362
363 case ZMQ_MSG_T_SIZE:
364 if (is_int) {
365 *value = sizeof (zmq_msg_t);
366 return 0;
367 }
368 break;
369
370 case ZMQ_ZERO_COPY_RECV:
371 if (is_int) {
372 *value = _zero_copy;
373 return 0;
374 }
375 break;
376
377 default: {
378 return thread_ctx_t::get (option_, optval_, optvallen_);
379 }
380 }
381
382 errno = EINVAL;
383 return -1;
384}
385
386int zmq::ctx_t::get (int option_)
387{
388 int optval = 0;
389 size_t optvallen = sizeof (int);
390
391 if (get (option_, &optval, &optvallen) == 0)
392 return optval;
393
394 errno = EINVAL;
395 return -1;
396}
397
398bool zmq::ctx_t::start ()
399{
400 // Initialise the array of mailboxes. Additional two slots are for
401 // zmq_ctx_term thread and reaper thread.
402 _opt_sync.lock ();
403 const int term_and_reaper_threads_count = 2;
404 const int mazmq = _max_sockets;
405 const int ios = _io_thread_count;
406 _opt_sync.unlock ();
407 int slot_count = mazmq + ios + term_and_reaper_threads_count;
408 try {
409 _slots.reserve (slot_count);
410 _empty_slots.reserve (slot_count - term_and_reaper_threads_count);
411 }
412 catch (const std::bad_alloc &) {
413 errno = ENOMEM;
414 return false;
415 }
416 _slots.resize (term_and_reaper_threads_count);
417
418 // Initialise the infrastructure for zmq_ctx_term thread.
419 _slots[term_tid] = &_term_mailbox;
420
421 // Create the reaper thread.
422 _reaper = new (std::nothrow) reaper_t (this, reaper_tid);
423 if (!_reaper) {
424 errno = ENOMEM;
425 goto fail_cleanup_slots;
426 }
427 if (!_reaper->get_mailbox ()->valid ())
428 goto fail_cleanup_reaper;
429 _slots[reaper_tid] = _reaper->get_mailbox ();
430 _reaper->start ();
431
432 // Create I/O thread objects and launch them.
433 _slots.resize (slot_count, NULL);
434
435 for (int i = term_and_reaper_threads_count;
436 i != ios + term_and_reaper_threads_count; i++) {
437 io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
438 if (!io_thread) {
439 errno = ENOMEM;
440 goto fail_cleanup_reaper;
441 }
442 if (!io_thread->get_mailbox ()->valid ()) {
443 delete io_thread;
444 goto fail_cleanup_reaper;
445 }
446 _io_threads.push_back (io_thread);
447 _slots[i] = io_thread->get_mailbox ();
448 io_thread->start ();
449 }
450
451 // In the unused part of the slot array, create a list of empty slots.
452 for (int32_t i = static_cast<int32_t> (_slots.size ()) - 1;
453 i >= static_cast<int32_t> (ios) + term_and_reaper_threads_count; i--) {
454 _empty_slots.push_back (i);
455 }
456
457 _starting = false;
458 return true;
459
460fail_cleanup_reaper:
461 _reaper->stop ();
462 delete _reaper;
463 _reaper = NULL;
464
465fail_cleanup_slots:
466 _slots.clear ();
467 return false;
468}
469
470zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
471{
472 scoped_lock_t locker (_slot_sync);
473
474 if (unlikely (_starting)) {
475 if (!start ())
476 return NULL;
477 }
478
479 // Once zmq_ctx_term() was called, we can't create new sockets.
480 if (_terminating) {
481 errno = ETERM;
482 return NULL;
483 }
484
485 // If max_sockets limit was reached, return error.
486 if (_empty_slots.empty ()) {
487 errno = EMFILE;
488 return NULL;
489 }
490
491 // Choose a slot for the socket.
492 uint32_t slot = _empty_slots.back ();
493 _empty_slots.pop_back ();
494
495 // Generate new unique socket ID.
496 int sid = (static_cast<int> (max_socket_id.add (1))) + 1;
497
498 // Create the socket and register its mailbox.
499 socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
500 if (!s) {
501 _empty_slots.push_back (slot);
502 return NULL;
503 }
504 _sockets.push_back (s);
505 _slots[slot] = s->get_mailbox ();
506
507 return s;
508}
509
510void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
511{
512 scoped_lock_t locker (_slot_sync);
513
514 // Free the associated thread slot.
515 uint32_t tid = socket_->get_tid ();
516 _empty_slots.push_back (tid);
517 _slots[tid] = NULL;
518
519 // Remove the socket from the list of sockets.
520 _sockets.erase (socket_);
521
522 // If zmq_ctx_term() was already called and there are no more socket
523 // we can ask reaper thread to terminate.
524 if (_terminating && _sockets.empty ())
525 _reaper->stop ();
526}
527
528zmq::object_t *zmq::ctx_t::get_reaper ()
529{
530 return _reaper;
531}
532
533zmq::thread_ctx_t::thread_ctx_t () :
534 _thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
535 _thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT)
536{
537}
538
539void zmq::thread_ctx_t::start_thread (thread_t &thread_,
540 thread_fn *tfn_,
541 void *arg_,
542 const char *name_) const
543{
544 thread_.setSchedulingParameters (_thread_priority, _thread_sched_policy,
545 _thread_affinity_cpus);
546
547 char namebuf[16] = "";
548 snprintf (namebuf, sizeof (namebuf), "%s%sZMQbg%s%s",
549 _thread_name_prefix.empty () ? "" : _thread_name_prefix.c_str (),
550 _thread_name_prefix.empty () ? "" : "/", name_ ? "/" : "",
551 name_ ? name_ : "");
552 thread_.start (tfn_, arg_, namebuf);
553}
554
555int zmq::thread_ctx_t::set (int option_, const void *optval_, size_t optvallen_)
556{
557 bool is_int = (optvallen_ == sizeof (int));
558 int value = 0;
559 if (is_int)
560 memcpy (&value, optval_, sizeof (int));
561
562 switch (option_) {
563 case ZMQ_THREAD_SCHED_POLICY:
564 if (is_int && value >= 0) {
565 scoped_lock_t locker (_opt_sync);
566 _thread_sched_policy = value;
567 return 0;
568 }
569 break;
570
571 case ZMQ_THREAD_AFFINITY_CPU_ADD:
572 if (is_int && value >= 0) {
573 scoped_lock_t locker (_opt_sync);
574 _thread_affinity_cpus.insert (value);
575 return 0;
576 }
577 break;
578
579 case ZMQ_THREAD_AFFINITY_CPU_REMOVE:
580 if (is_int && value >= 0) {
581 scoped_lock_t locker (_opt_sync);
582 if (0 == _thread_affinity_cpus.erase (value)) {
583 errno = EINVAL;
584 return -1;
585 }
586 return 0;
587 }
588 break;
589
590 case ZMQ_THREAD_PRIORITY:
591 if (is_int && value >= 0) {
592 scoped_lock_t locker (_opt_sync);
593 _thread_priority = value;
594 return 0;
595 }
596 break;
597
598 case ZMQ_THREAD_NAME_PREFIX:
599 // start_thread() allows max 16 chars for thread name
600 if (is_int) {
601 std::ostringstream s;
602 s << value;
603 scoped_lock_t locker (_opt_sync);
604 _thread_name_prefix = s.str ();
605 return 0;
606 } else if (optvallen_ > 0 && optvallen_ <= 16) {
607 scoped_lock_t locker (_opt_sync);
608 _thread_name_prefix.assign (static_cast<const char *> (optval_),
609 optvallen_);
610 return 0;
611 }
612 break;
613 }
614
615 errno = EINVAL;
616 return -1;
617}
618
619int zmq::thread_ctx_t::get (int option_,
620 void *optval_,
621 const size_t *optvallen_)
622{
623 const bool is_int = (*optvallen_ == sizeof (int));
624 int *value = static_cast<int *> (optval_);
625
626 switch (option_) {
627 case ZMQ_THREAD_SCHED_POLICY:
628 if (is_int) {
629 scoped_lock_t locker (_opt_sync);
630 *value = _thread_sched_policy;
631 return 0;
632 }
633 break;
634
635 case ZMQ_THREAD_NAME_PREFIX:
636 if (is_int) {
637 scoped_lock_t locker (_opt_sync);
638 *value = atoi (_thread_name_prefix.c_str ());
639 return 0;
640 } else if (*optvallen_ >= _thread_name_prefix.size ()) {
641 scoped_lock_t locker (_opt_sync);
642 memcpy (optval_, _thread_name_prefix.data (),
643 _thread_name_prefix.size ());
644 return 0;
645 }
646 break;
647 }
648
649 errno = EINVAL;
650 return -1;
651}
652
653void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
654{
655 _slots[tid_]->send (command_);
656}
657
658zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
659{
660 if (_io_threads.empty ())
661 return NULL;
662
663 // Find the I/O thread with minimum load.
664 int min_load = -1;
665 io_thread_t *selected_io_thread = NULL;
666 for (io_threads_t::size_type i = 0; i != _io_threads.size (); i++) {
667 if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
668 int load = _io_threads[i]->get_load ();
669 if (selected_io_thread == NULL || load < min_load) {
670 min_load = load;
671 selected_io_thread = _io_threads[i];
672 }
673 }
674 }
675 return selected_io_thread;
676}
677
678int zmq::ctx_t::register_endpoint (const char *addr_,
679 const endpoint_t &endpoint_)
680{
681 scoped_lock_t locker (_endpoints_sync);
682
683 const bool inserted =
684 _endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (addr_), endpoint_)
685 .second;
686 if (!inserted) {
687 errno = EADDRINUSE;
688 return -1;
689 }
690 return 0;
691}
692
693int zmq::ctx_t::unregister_endpoint (const std::string &addr_,
694 socket_base_t *socket_)
695{
696 scoped_lock_t locker (_endpoints_sync);
697
698 const endpoints_t::iterator it = _endpoints.find (addr_);
699 if (it == _endpoints.end () || it->second.socket != socket_) {
700 errno = ENOENT;
701 return -1;
702 }
703
704 // Remove endpoint.
705 _endpoints.erase (it);
706
707 return 0;
708}
709
710void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
711{
712 scoped_lock_t locker (_endpoints_sync);
713
714 for (endpoints_t::iterator it = _endpoints.begin (),
715 end = _endpoints.end ();
716 it != end;) {
717 if (it->second.socket == socket_)
718#if __cplusplus >= 201103L
719 it = _endpoints.erase (it);
720#else
721 _endpoints.erase (it++);
722#endif
723 else
724 ++it;
725 }
726}
727
728zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
729{
730 scoped_lock_t locker (_endpoints_sync);
731
732 endpoints_t::iterator it = _endpoints.find (addr_);
733 if (it == _endpoints.end ()) {
734 errno = ECONNREFUSED;
735 endpoint_t empty = {NULL, options_t ()};
736 return empty;
737 }
738 endpoint_t endpoint = it->second;
739
740 // Increment the command sequence number of the peer so that it won't
741 // get deallocated until "bind" command is issued by the caller.
742 // The subsequent 'bind' has to be called with inc_seqnum parameter
743 // set to false, so that the seqnum isn't incremented twice.
744 endpoint.socket->inc_seqnum ();
745
746 return endpoint;
747}
748
749void zmq::ctx_t::pend_connection (const std::string &addr_,
750 const endpoint_t &endpoint_,
751 pipe_t **pipes_)
752{
753 scoped_lock_t locker (_endpoints_sync);
754
755 const pending_connection_t pending_connection = {endpoint_, pipes_[0],
756 pipes_[1]};
757
758 endpoints_t::iterator it = _endpoints.find (addr_);
759 if (it == _endpoints.end ()) {
760 // Still no bind.
761 endpoint_.socket->inc_seqnum ();
762 _pending_connections.ZMQ_MAP_INSERT_OR_EMPLACE (addr_,
763 pending_connection);
764 } else {
765 // Bind has happened in the mean time, connect directly
766 connect_inproc_sockets (it->second.socket, it->second.options,
767 pending_connection, connect_side);
768 }
769}
770
771void zmq::ctx_t::connect_pending (const char *addr_,
772 zmq::socket_base_t *bind_socket_)
773{
774 scoped_lock_t locker (_endpoints_sync);
775
776 std::pair<pending_connections_t::iterator, pending_connections_t::iterator>
777 pending = _pending_connections.equal_range (addr_);
778 for (pending_connections_t::iterator p = pending.first; p != pending.second;
779 ++p)
780 connect_inproc_sockets (bind_socket_, _endpoints[addr_].options,
781 p->second, bind_side);
782
783 _pending_connections.erase (pending.first, pending.second);
784}
785
786void zmq::ctx_t::connect_inproc_sockets (
787 zmq::socket_base_t *bind_socket_,
788 options_t &bind_options_,
789 const pending_connection_t &pending_connection_,
790 side side_)
791{
792 bind_socket_->inc_seqnum ();
793 pending_connection_.bind_pipe->set_tid (bind_socket_->get_tid ());
794
795 if (!bind_options_.recv_routing_id) {
796 msg_t msg;
797 const bool ok = pending_connection_.bind_pipe->read (&msg);
798 zmq_assert (ok);
799 const int rc = msg.close ();
800 errno_assert (rc == 0);
801 }
802
803 if (!get_effective_conflate_option (pending_connection_.endpoint.options)) {
804 pending_connection_.connect_pipe->set_hwms_boost (bind_options_.sndhwm,
805 bind_options_.rcvhwm);
806 pending_connection_.bind_pipe->set_hwms_boost (
807 pending_connection_.endpoint.options.sndhwm,
808 pending_connection_.endpoint.options.rcvhwm);
809
810 pending_connection_.connect_pipe->set_hwms (
811 pending_connection_.endpoint.options.rcvhwm,
812 pending_connection_.endpoint.options.sndhwm);
813 pending_connection_.bind_pipe->set_hwms (bind_options_.rcvhwm,
814 bind_options_.sndhwm);
815 } else {
816 pending_connection_.connect_pipe->set_hwms (-1, -1);
817 pending_connection_.bind_pipe->set_hwms (-1, -1);
818 }
819
820 if (side_ == bind_side) {
821 command_t cmd;
822 cmd.type = command_t::bind;
823 cmd.args.bind.pipe = pending_connection_.bind_pipe;
824 bind_socket_->process_command (cmd);
825 bind_socket_->send_inproc_connected (
826 pending_connection_.endpoint.socket);
827 } else
828 pending_connection_.connect_pipe->send_bind (
829 bind_socket_, pending_connection_.bind_pipe, false);
830
831 // When a ctx is terminated all pending inproc connection will be
832 // connected, but the socket will already be closed and the pipe will be
833 // in waiting_for_delimiter state, which means no more writes can be done
834 // and the routing id write fails and causes an assert. Check if the socket
835 // is open before sending.
836 if (pending_connection_.endpoint.options.recv_routing_id
837 && pending_connection_.endpoint.socket->check_tag ()) {
838 send_routing_id (pending_connection_.bind_pipe, bind_options_);
839 }
840}
841
842#ifdef ZMQ_HAVE_VMCI
843
844int zmq::ctx_t::get_vmci_socket_family ()
845{
846 zmq::scoped_lock_t locker (_vmci_sync);
847
848 if (_vmci_fd == -1) {
849 _vmci_family = VMCISock_GetAFValueFd (&_vmci_fd);
850
851 if (_vmci_fd != -1) {
852#ifdef FD_CLOEXEC
853 int rc = fcntl (_vmci_fd, F_SETFD, FD_CLOEXEC);
854 errno_assert (rc != -1);
855#endif
856 }
857 }
858
859 return _vmci_family;
860}
861
862#endif
863
864// The last used socket ID, or 0 if no socket was used so far. Note that this
865// is a global variable. Thus, even sockets created in different contexts have
866// unique IDs.
867zmq::atomic_counter_t zmq::ctx_t::max_socket_id;
868