1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30// "Tell them I was a writer.
31// A maker of software.
32// A humanist. A father.
33// And many things.
34// But above all, a writer.
35// Thank You. :)"
36// - Pieter Hintjens
37
38#include "precompiled.hpp"
39#define ZMQ_TYPE_UNSAFE
40
41#include "macros.hpp"
42#include "poller.hpp"
43
44#if !defined ZMQ_HAVE_POLLER
45// On AIX platform, poll.h has to be included first to get consistent
46// definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
47// instead of 'events' and 'revents' and defines macros to map from POSIX-y
48// names to AIX-specific names).
49#if defined ZMQ_POLL_BASED_ON_POLL && !defined ZMQ_HAVE_WINDOWS
50#include <poll.h>
51#endif
52
53#include "polling_util.hpp"
54#endif
55
56// TODO: determine if this is an issue, since zmq.h is being loaded from pch.
57// zmq.h must be included *after* poll.h for AIX to build properly
58//#include "../include/zmq.h"
59
60#if !defined ZMQ_HAVE_WINDOWS
61#include <unistd.h>
62#ifdef ZMQ_HAVE_VXWORKS
63#include <strings.h>
64#endif
65#endif
66
67// XSI vector I/O
68#if defined ZMQ_HAVE_UIO
69#include <sys/uio.h>
70#else
71struct iovec
72{
73 void *iov_base;
74 size_t iov_len;
75};
76#endif
77
78#include <string.h>
79#include <stdlib.h>
80#include <new>
81#include <climits>
82
83#include "proxy.hpp"
84#include "socket_base.hpp"
85#include "stdint.hpp"
86#include "config.hpp"
87#include "likely.hpp"
88#include "clock.hpp"
89#include "ctx.hpp"
90#include "err.hpp"
91#include "msg.hpp"
92#include "fd.hpp"
93#include "metadata.hpp"
94#include "socket_poller.hpp"
95#include "timers.hpp"
96#include "ip.hpp"
97#include "address.hpp"
98
99#if defined ZMQ_HAVE_OPENPGM
100#define __PGM_WININT_H__
101#include <pgm/pgm.h>
102#endif
103
104// Compile time check whether msg_t fits into zmq_msg_t.
105typedef char
106 check_msg_t_size[sizeof (zmq::msg_t) == sizeof (zmq_msg_t) ? 1 : -1];
107
108
109void zmq_version (int *major_, int *minor_, int *patch_)
110{
111 *major_ = ZMQ_VERSION_MAJOR;
112 *minor_ = ZMQ_VERSION_MINOR;
113 *patch_ = ZMQ_VERSION_PATCH;
114}
115
116
117const char *zmq_strerror (int errnum_)
118{
119 return zmq::errno_to_string (errnum_);
120}
121
122int zmq_errno (void)
123{
124 return errno;
125}
126
127
128// New context API
129
130void *zmq_ctx_new (void)
131{
132 // We do this before the ctx constructor since its embedded mailbox_t
133 // object needs the network to be up and running (at least on Windows).
134 if (!zmq::initialize_network ()) {
135 return NULL;
136 }
137
138 // Create 0MQ context.
139 zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
140 if (ctx) {
141 if (!ctx->valid ()) {
142 delete ctx;
143 return NULL;
144 }
145 }
146 return ctx;
147}
148
149int zmq_ctx_term (void *ctx_)
150{
151 if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
152 errno = EFAULT;
153 return -1;
154 }
155
156 int rc = (static_cast<zmq::ctx_t *> (ctx_))->terminate ();
157 int en = errno;
158
159 // Shut down only if termination was not interrupted by a signal.
160 if (!rc || en != EINTR) {
161 zmq::shutdown_network ();
162 }
163
164 errno = en;
165 return rc;
166}
167
168int zmq_ctx_shutdown (void *ctx_)
169{
170 if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
171 errno = EFAULT;
172 return -1;
173 }
174 return (static_cast<zmq::ctx_t *> (ctx_))->shutdown ();
175}
176
177int zmq_ctx_set (void *ctx_, int option_, int optval_)
178{
179 return zmq_ctx_set_ext (ctx_, option_, &optval_, sizeof (int));
180}
181
182int zmq_ctx_set_ext (void *ctx_,
183 int option_,
184 const void *optval_,
185 size_t optvallen_)
186{
187 if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
188 errno = EFAULT;
189 return -1;
190 }
191 return (static_cast<zmq::ctx_t *> (ctx_))
192 ->set (option_, optval_, optvallen_);
193}
194
195int zmq_ctx_get (void *ctx_, int option_)
196{
197 int optval = 0;
198 size_t optvallen = sizeof (int);
199 if (zmq_ctx_get_ext (ctx_, option_, &optval, &optvallen) == 0) {
200 return optval;
201 }
202
203 errno = EFAULT;
204 return -1;
205}
206
207int zmq_ctx_get_ext (void *ctx_, int option_, void *optval_, size_t *optvallen_)
208{
209 if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
210 errno = EFAULT;
211 return -1;
212 }
213 return (static_cast<zmq::ctx_t *> (ctx_))
214 ->get (option_, optval_, optvallen_);
215}
216
217
218// Stable/legacy context API
219
220void *zmq_init (int io_threads_)
221{
222 if (io_threads_ >= 0) {
223 void *ctx = zmq_ctx_new ();
224 zmq_ctx_set (ctx, ZMQ_IO_THREADS, io_threads_);
225 return ctx;
226 }
227 errno = EINVAL;
228 return NULL;
229}
230
231int zmq_term (void *ctx_)
232{
233 return zmq_ctx_term (ctx_);
234}
235
236int zmq_ctx_destroy (void *ctx_)
237{
238 return zmq_ctx_term (ctx_);
239}
240
241
242// Sockets
243
244static zmq::socket_base_t *as_socket_base_t (void *s_)
245{
246 zmq::socket_base_t *s = static_cast<zmq::socket_base_t *> (s_);
247 if (!s_ || !s->check_tag ()) {
248 errno = ENOTSOCK;
249 return NULL;
250 }
251 return s;
252}
253
254void *zmq_socket (void *ctx_, int type_)
255{
256 if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
257 errno = EFAULT;
258 return NULL;
259 }
260 zmq::ctx_t *ctx = static_cast<zmq::ctx_t *> (ctx_);
261 zmq::socket_base_t *s = ctx->create_socket (type_);
262 return (void *) s;
263}
264
265int zmq_close (void *s_)
266{
267 zmq::socket_base_t *s = as_socket_base_t (s_);
268 if (!s)
269 return -1;
270 s->close ();
271 return 0;
272}
273
274int zmq_setsockopt (void *s_,
275 int option_,
276 const void *optval_,
277 size_t optvallen_)
278{
279 zmq::socket_base_t *s = as_socket_base_t (s_);
280 if (!s)
281 return -1;
282 return s->setsockopt (option_, optval_, optvallen_);
283}
284
285int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
286{
287 zmq::socket_base_t *s = as_socket_base_t (s_);
288 if (!s)
289 return -1;
290 return s->getsockopt (option_, optval_, optvallen_);
291}
292
293int zmq_socket_monitor_versioned (
294 void *s_, const char *addr_, uint64_t events_, int event_version_, int type_)
295{
296 zmq::socket_base_t *s = as_socket_base_t (s_);
297 if (!s)
298 return -1;
299 return s->monitor (addr_, events_, event_version_, type_);
300}
301
302int zmq_socket_monitor (void *s_, const char *addr_, int events_)
303{
304 return zmq_socket_monitor_versioned (s_, addr_, events_, 1, ZMQ_PAIR);
305}
306
307int zmq_join (void *s_, const char *group_)
308{
309 zmq::socket_base_t *s = as_socket_base_t (s_);
310 if (!s)
311 return -1;
312 return s->join (group_);
313}
314
315int zmq_leave (void *s_, const char *group_)
316{
317 zmq::socket_base_t *s = as_socket_base_t (s_);
318 if (!s)
319 return -1;
320 return s->leave (group_);
321}
322
323int zmq_bind (void *s_, const char *addr_)
324{
325 zmq::socket_base_t *s = as_socket_base_t (s_);
326 if (!s)
327 return -1;
328 return s->bind (addr_);
329}
330
331int zmq_connect (void *s_, const char *addr_)
332{
333 zmq::socket_base_t *s = as_socket_base_t (s_);
334 if (!s)
335 return -1;
336 return s->connect (addr_);
337}
338
339int zmq_unbind (void *s_, const char *addr_)
340{
341 zmq::socket_base_t *s = as_socket_base_t (s_);
342 if (!s)
343 return -1;
344 return s->term_endpoint (addr_);
345}
346
347int zmq_disconnect (void *s_, const char *addr_)
348{
349 zmq::socket_base_t *s = as_socket_base_t (s_);
350 if (!s)
351 return -1;
352 return s->term_endpoint (addr_);
353}
354
355// Sending functions.
356
357static inline int
358s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
359{
360 size_t sz = zmq_msg_size (msg_);
361 int rc = s_->send (reinterpret_cast<zmq::msg_t *> (msg_), flags_);
362 if (unlikely (rc < 0))
363 return -1;
364
365 // This is what I'd like to do, my C++ fu is too weak -- PH 2016/02/09
366 // int max_msgsz = s_->parent->get (ZMQ_MAX_MSGSZ);
367 size_t max_msgsz = INT_MAX;
368
369 // Truncate returned size to INT_MAX to avoid overflow to negative values
370 return static_cast<int> (sz < max_msgsz ? sz : max_msgsz);
371}
372
373/* To be deprecated once zmq_msg_send() is stable */
374int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
375{
376 return zmq_msg_send (msg_, s_, flags_);
377}
378
379int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
380{
381 zmq::socket_base_t *s = as_socket_base_t (s_);
382 if (!s)
383 return -1;
384 zmq_msg_t msg;
385 if (zmq_msg_init_size (&msg, len_))
386 return -1;
387
388 // We explicitly allow a send from NULL, size zero
389 if (len_) {
390 assert (buf_);
391 memcpy (zmq_msg_data (&msg), buf_, len_);
392 }
393 int rc = s_sendmsg (s, &msg, flags_);
394 if (unlikely (rc < 0)) {
395 int err = errno;
396 int rc2 = zmq_msg_close (&msg);
397 errno_assert (rc2 == 0);
398 errno = err;
399 return -1;
400 }
401 // Note the optimisation here. We don't close the msg object as it is
402 // empty anyway. This may change when implementation of zmq_msg_t changes.
403 return rc;
404}
405
406int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
407{
408 zmq::socket_base_t *s = as_socket_base_t (s_);
409 if (!s)
410 return -1;
411 zmq_msg_t msg;
412 int rc =
413 zmq_msg_init_data (&msg, const_cast<void *> (buf_), len_, NULL, NULL);
414 if (rc != 0)
415 return -1;
416
417 rc = s_sendmsg (s, &msg, flags_);
418 if (unlikely (rc < 0)) {
419 int err = errno;
420 int rc2 = zmq_msg_close (&msg);
421 errno_assert (rc2 == 0);
422 errno = err;
423 return -1;
424 }
425 // Note the optimisation here. We don't close the msg object as it is
426 // empty anyway. This may change when implementation of zmq_msg_t changes.
427 return rc;
428}
429
430
431// Send multiple messages.
432// TODO: this function has no man page
433//
434// If flag bit ZMQ_SNDMORE is set the vector is treated as
435// a single multi-part message, i.e. the last message has
436// ZMQ_SNDMORE bit switched off.
437//
438int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
439{
440 zmq::socket_base_t *s = as_socket_base_t (s_);
441 if (!s)
442 return -1;
443 if (unlikely (count_ <= 0 || !a_)) {
444 errno = EINVAL;
445 return -1;
446 }
447
448 int rc = 0;
449 zmq_msg_t msg;
450
451 for (size_t i = 0; i < count_; ++i) {
452 rc = zmq_msg_init_size (&msg, a_[i].iov_len);
453 if (rc != 0) {
454 rc = -1;
455 break;
456 }
457 memcpy (zmq_msg_data (&msg), a_[i].iov_base, a_[i].iov_len);
458 if (i == count_ - 1)
459 flags_ = flags_ & ~ZMQ_SNDMORE;
460 rc = s_sendmsg (s, &msg, flags_);
461 if (unlikely (rc < 0)) {
462 int err = errno;
463 int rc2 = zmq_msg_close (&msg);
464 errno_assert (rc2 == 0);
465 errno = err;
466 rc = -1;
467 break;
468 }
469 }
470 return rc;
471}
472
473// Receiving functions.
474
475static int s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
476{
477 int rc = s_->recv (reinterpret_cast<zmq::msg_t *> (msg_), flags_);
478 if (unlikely (rc < 0))
479 return -1;
480
481 // Truncate returned size to INT_MAX to avoid overflow to negative values
482 size_t sz = zmq_msg_size (msg_);
483 return static_cast<int> (sz < INT_MAX ? sz : INT_MAX);
484}
485
486/* To be deprecated once zmq_msg_recv() is stable */
487int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
488{
489 return zmq_msg_recv (msg_, s_, flags_);
490}
491
492
493int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
494{
495 zmq::socket_base_t *s = as_socket_base_t (s_);
496 if (!s)
497 return -1;
498 zmq_msg_t msg;
499 int rc = zmq_msg_init (&msg);
500 errno_assert (rc == 0);
501
502 int nbytes = s_recvmsg (s, &msg, flags_);
503 if (unlikely (nbytes < 0)) {
504 int err = errno;
505 rc = zmq_msg_close (&msg);
506 errno_assert (rc == 0);
507 errno = err;
508 return -1;
509 }
510
511 // An oversized message is silently truncated.
512 size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;
513
514 // We explicitly allow a null buffer argument if len is zero
515 if (to_copy) {
516 assert (buf_);
517 memcpy (buf_, zmq_msg_data (&msg), to_copy);
518 }
519 rc = zmq_msg_close (&msg);
520 errno_assert (rc == 0);
521
522 return nbytes;
523}
524
525// Receive a multi-part message
526//
527// Receives up to *count_ parts of a multi-part message.
528// Sets *count_ to the actual number of parts read.
529// ZMQ_RCVMORE is set to indicate if a complete multi-part message was read.
530// Returns number of message parts read, or -1 on error.
531//
532// Note: even if -1 is returned, some parts of the message
533// may have been read. Therefore the client must consult
534// *count_ to retrieve message parts successfully read,
535// even if -1 is returned.
536//
537// The iov_base* buffers of each iovec *a_ filled in by this
538// function may be freed using free().
539// TODO: this function has no man page
540//
541int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
542{
543 zmq::socket_base_t *s = as_socket_base_t (s_);
544 if (!s)
545 return -1;
546 if (unlikely (!count_ || *count_ <= 0 || !a_)) {
547 errno = EINVAL;
548 return -1;
549 }
550
551 size_t count = *count_;
552 int nread = 0;
553 bool recvmore = true;
554
555 *count_ = 0;
556
557 for (size_t i = 0; recvmore && i < count; ++i) {
558 zmq_msg_t msg;
559 int rc = zmq_msg_init (&msg);
560 errno_assert (rc == 0);
561
562 int nbytes = s_recvmsg (s, &msg, flags_);
563 if (unlikely (nbytes < 0)) {
564 int err = errno;
565 rc = zmq_msg_close (&msg);
566 errno_assert (rc == 0);
567 errno = err;
568 nread = -1;
569 break;
570 }
571
572 a_[i].iov_len = zmq_msg_size (&msg);
573 a_[i].iov_base = static_cast<char *> (malloc (a_[i].iov_len));
574 if (unlikely (!a_[i].iov_base)) {
575 errno = ENOMEM;
576 return -1;
577 }
578 memcpy (a_[i].iov_base, static_cast<char *> (zmq_msg_data (&msg)),
579 a_[i].iov_len);
580 // Assume zmq_socket ZMQ_RVCMORE is properly set.
581 const zmq::msg_t *p_msg = reinterpret_cast<const zmq::msg_t *> (&msg);
582 recvmore = p_msg->flags () & zmq::msg_t::more;
583 rc = zmq_msg_close (&msg);
584 errno_assert (rc == 0);
585 ++*count_;
586 ++nread;
587 }
588 return nread;
589}
590
591// Message manipulators.
592
593int zmq_msg_init (zmq_msg_t *msg_)
594{
595 return (reinterpret_cast<zmq::msg_t *> (msg_))->init ();
596}
597
598int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
599{
600 return (reinterpret_cast<zmq::msg_t *> (msg_))->init_size (size_);
601}
602
603int zmq_msg_init_data (
604 zmq_msg_t *msg_, void *data_, size_t size_, zmq_free_fn *ffn_, void *hint_)
605{
606 return (reinterpret_cast<zmq::msg_t *> (msg_))
607 ->init_data (data_, size_, ffn_, hint_);
608}
609
610int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_)
611{
612 zmq::socket_base_t *s = as_socket_base_t (s_);
613 if (!s)
614 return -1;
615 return s_sendmsg (s, msg_, flags_);
616}
617
618int zmq_msg_recv (zmq_msg_t *msg_, void *s_, int flags_)
619{
620 zmq::socket_base_t *s = as_socket_base_t (s_);
621 if (!s)
622 return -1;
623 return s_recvmsg (s, msg_, flags_);
624}
625
626int zmq_msg_close (zmq_msg_t *msg_)
627{
628 return (reinterpret_cast<zmq::msg_t *> (msg_))->close ();
629}
630
631int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
632{
633 return (reinterpret_cast<zmq::msg_t *> (dest_))
634 ->move (*reinterpret_cast<zmq::msg_t *> (src_));
635}
636
637int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
638{
639 return (reinterpret_cast<zmq::msg_t *> (dest_))
640 ->copy (*reinterpret_cast<zmq::msg_t *> (src_));
641}
642
643void *zmq_msg_data (zmq_msg_t *msg_)
644{
645 return (reinterpret_cast<zmq::msg_t *> (msg_))->data ();
646}
647
648size_t zmq_msg_size (const zmq_msg_t *msg_)
649{
650 return ((zmq::msg_t *) msg_)->size ();
651}
652
653int zmq_msg_more (const zmq_msg_t *msg_)
654{
655 return zmq_msg_get (msg_, ZMQ_MORE);
656}
657
658int zmq_msg_get (const zmq_msg_t *msg_, int property_)
659{
660 const char *fd_string;
661
662 switch (property_) {
663 case ZMQ_MORE:
664 return (((zmq::msg_t *) msg_)->flags () & zmq::msg_t::more) ? 1 : 0;
665 case ZMQ_SRCFD:
666 fd_string = zmq_msg_gets (msg_, "__fd");
667 if (fd_string == NULL)
668 return -1;
669
670 return atoi (fd_string);
671 case ZMQ_SHARED:
672 return (((zmq::msg_t *) msg_)->is_cmsg ())
673 || (((zmq::msg_t *) msg_)->flags () & zmq::msg_t::shared)
674 ? 1
675 : 0;
676 default:
677 errno = EINVAL;
678 return -1;
679 }
680}
681
682int zmq_msg_set (zmq_msg_t *, int, int)
683{
684 // No properties supported at present
685 errno = EINVAL;
686 return -1;
687}
688
689int zmq_msg_set_routing_id (zmq_msg_t *msg_, uint32_t routing_id_)
690{
691 return (reinterpret_cast<zmq::msg_t *> (msg_))
692 ->set_routing_id (routing_id_);
693}
694
695uint32_t zmq_msg_routing_id (zmq_msg_t *msg_)
696{
697 return (reinterpret_cast<zmq::msg_t *> (msg_))->get_routing_id ();
698}
699
700int zmq_msg_set_group (zmq_msg_t *msg_, const char *group_)
701{
702 return (reinterpret_cast<zmq::msg_t *> (msg_))->set_group (group_);
703}
704
705const char *zmq_msg_group (zmq_msg_t *msg_)
706{
707 return (reinterpret_cast<zmq::msg_t *> (msg_))->group ();
708}
709
710// Get message metadata string
711
712const char *zmq_msg_gets (const zmq_msg_t *msg_, const char *property_)
713{
714 const zmq::metadata_t *metadata =
715 reinterpret_cast<const zmq::msg_t *> (msg_)->metadata ();
716 const char *value = NULL;
717 if (metadata)
718 value = metadata->get (std::string (property_));
719 if (value)
720 return value;
721
722 errno = EINVAL;
723 return NULL;
724}
725
726// Polling.
727
728#if defined ZMQ_HAVE_POLLER
729inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
730{
731 // implement zmq_poll on top of zmq_poller
732 int rc;
733 zmq_poller_event_t *events;
734 zmq::socket_poller_t poller;
735 events = new (std::nothrow) zmq_poller_event_t[nitems_];
736 alloc_assert (events);
737
738 bool repeat_items = false;
739 // Register sockets with poller
740 for (int i = 0; i < nitems_; i++) {
741 items_[i].revents = 0;
742
743 bool modify = false;
744 short e = items_[i].events;
745 if (items_[i].socket) {
746 // Poll item is a 0MQ socket.
747 for (int j = 0; j < i; ++j) {
748 // Check for repeat entries
749 if (items_[j].socket == items_[i].socket) {
750 repeat_items = true;
751 modify = true;
752 e |= items_[j].events;
753 }
754 }
755 if (modify) {
756 rc = zmq_poller_modify (&poller, items_[i].socket, e);
757 } else {
758 rc = zmq_poller_add (&poller, items_[i].socket, NULL, e);
759 }
760 if (rc < 0) {
761 delete[] events;
762 return rc;
763 }
764 } else {
765 // Poll item is a raw file descriptor.
766 for (int j = 0; j < i; ++j) {
767 // Check for repeat entries
768 if (!items_[j].socket && items_[j].fd == items_[i].fd) {
769 repeat_items = true;
770 modify = true;
771 e |= items_[j].events;
772 }
773 }
774 if (modify) {
775 rc = zmq_poller_modify_fd (&poller, items_[i].fd, e);
776 } else {
777 rc = zmq_poller_add_fd (&poller, items_[i].fd, NULL, e);
778 }
779 if (rc < 0) {
780 delete[] events;
781 return rc;
782 }
783 }
784 }
785
786 // Wait for events
787 rc = zmq_poller_wait_all (&poller, events, nitems_, timeout_);
788 if (rc < 0) {
789 delete[] events;
790 if (zmq_errno () == EAGAIN) {
791 return 0;
792 }
793 return rc;
794 }
795
796 // Transform poller events into zmq_pollitem events.
797 // items_ contains all items, while events only contains fired events.
798 // If no sockets are repeated (likely), the two are still co-ordered, so step through the items
799 // checking for matches only on the first event.
800 // If there are repeat items, they cannot be assumed to be co-ordered,
801 // so each pollitem must check fired events from the beginning.
802 int j_start = 0, found_events = rc;
803 for (int i = 0; i < nitems_; i++) {
804 for (int j = j_start; j < found_events; ++j) {
805 if ((items_[i].socket && items_[i].socket == events[j].socket)
806 || (!(items_[i].socket || events[j].socket)
807 && items_[i].fd == events[j].fd)) {
808 items_[i].revents = events[j].events & items_[i].events;
809 if (!repeat_items) {
810 // no repeats, we can ignore events we've already seen
811 j_start++;
812 }
813 break;
814 }
815 if (!repeat_items) {
816 // no repeats, never have to look at j > j_start
817 break;
818 }
819 }
820 }
821
822 // Cleanup
823 delete[] events;
824 return rc;
825}
826#endif // ZMQ_HAVE_POLLER
827
828int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
829{
830#if defined ZMQ_HAVE_POLLER
831 // if poller is present, use that if there is at least 1 thread-safe socket,
832 // otherwise fall back to the previous implementation as it's faster.
833 for (int i = 0; i != nitems_; i++) {
834 if (items_[i].socket
835 && as_socket_base_t (items_[i].socket)->is_thread_safe ()) {
836 return zmq_poller_poll (items_, nitems_, timeout_);
837 }
838 }
839#endif // ZMQ_HAVE_POLLER
840#if defined ZMQ_POLL_BASED_ON_POLL || defined ZMQ_POLL_BASED_ON_SELECT
841 if (unlikely (nitems_ < 0)) {
842 errno = EINVAL;
843 return -1;
844 }
845 if (unlikely (nitems_ == 0)) {
846 if (timeout_ == 0)
847 return 0;
848#if defined ZMQ_HAVE_WINDOWS
849 Sleep (timeout_ > 0 ? timeout_ : INFINITE);
850 return 0;
851#elif defined ZMQ_HAVE_VXWORKS
852 struct timespec ns_;
853 ns_.tv_sec = timeout_ / 1000;
854 ns_.tv_nsec = timeout_ % 1000 * 1000000;
855 return nanosleep (&ns_, 0);
856#else
857 return usleep (timeout_ * 1000);
858#endif
859 }
860 if (!items_) {
861 errno = EFAULT;
862 return -1;
863 }
864
865 zmq::clock_t clock;
866 uint64_t now = 0;
867 uint64_t end = 0;
868#if defined ZMQ_POLL_BASED_ON_POLL
869 zmq::fast_vector_t<pollfd, ZMQ_POLLITEMS_DFLT> pollfds (nitems_);
870
871 // Build pollset for poll () system call.
872 for (int i = 0; i != nitems_; i++) {
873 // If the poll item is a 0MQ socket, we poll on the file descriptor
874 // retrieved by the ZMQ_FD socket option.
875 if (items_[i].socket) {
876 size_t zmq_fd_size = sizeof (zmq::fd_t);
877 if (zmq_getsockopt (items_[i].socket, ZMQ_FD, &pollfds[i].fd,
878 &zmq_fd_size)
879 == -1) {
880 return -1;
881 }
882 pollfds[i].events = items_[i].events ? POLLIN : 0;
883 }
884 // Else, the poll item is a raw file descriptor. Just convert the
885 // events to normal POLLIN/POLLOUT for poll ().
886 else {
887 pollfds[i].fd = items_[i].fd;
888 pollfds[i].events =
889 (items_[i].events & ZMQ_POLLIN ? POLLIN : 0)
890 | (items_[i].events & ZMQ_POLLOUT ? POLLOUT : 0)
891 | (items_[i].events & ZMQ_POLLPRI ? POLLPRI : 0);
892 }
893 }
894#else
895 // Ensure we do not attempt to select () on more than FD_SETSIZE
896 // file descriptors.
897 // TODO since this function is called by a client, we could return errno EINVAL/ENOMEM/... here
898 zmq_assert (nitems_ <= FD_SETSIZE);
899
900 zmq::optimized_fd_set_t pollset_in (nitems_);
901 FD_ZERO (pollset_in.get ());
902 zmq::optimized_fd_set_t pollset_out (nitems_);
903 FD_ZERO (pollset_out.get ());
904 zmq::optimized_fd_set_t pollset_err (nitems_);
905 FD_ZERO (pollset_err.get ());
906
907 zmq::fd_t maxfd = 0;
908
909 // Build the fd_sets for passing to select ().
910 for (int i = 0; i != nitems_; i++) {
911 // If the poll item is a 0MQ socket we are interested in input on the
912 // notification file descriptor retrieved by the ZMQ_FD socket option.
913 if (items_[i].socket) {
914 size_t zmq_fd_size = sizeof (zmq::fd_t);
915 zmq::fd_t notify_fd;
916 if (zmq_getsockopt (items_[i].socket, ZMQ_FD, &notify_fd,
917 &zmq_fd_size)
918 == -1)
919 return -1;
920 if (items_[i].events) {
921 FD_SET (notify_fd, pollset_in.get ());
922 if (maxfd < notify_fd)
923 maxfd = notify_fd;
924 }
925 }
926 // Else, the poll item is a raw file descriptor. Convert the poll item
927 // events to the appropriate fd_sets.
928 else {
929 if (items_[i].events & ZMQ_POLLIN)
930 FD_SET (items_[i].fd, pollset_in.get ());
931 if (items_[i].events & ZMQ_POLLOUT)
932 FD_SET (items_[i].fd, pollset_out.get ());
933 if (items_[i].events & ZMQ_POLLERR)
934 FD_SET (items_[i].fd, pollset_err.get ());
935 if (maxfd < items_[i].fd)
936 maxfd = items_[i].fd;
937 }
938 }
939
940 zmq::optimized_fd_set_t inset (nitems_);
941 zmq::optimized_fd_set_t outset (nitems_);
942 zmq::optimized_fd_set_t errset (nitems_);
943#endif
944
945 bool first_pass = true;
946 int nevents = 0;
947
948 while (true) {
949#if defined ZMQ_POLL_BASED_ON_POLL
950
951 // Compute the timeout for the subsequent poll.
952 zmq::timeout_t timeout =
953 zmq::compute_timeout (first_pass, timeout_, now, end);
954
955 // Wait for events.
956 {
957 int rc = poll (&pollfds[0], nitems_, timeout);
958 if (rc == -1 && errno == EINTR) {
959 return -1;
960 }
961 errno_assert (rc >= 0);
962 }
963 // Check for the events.
964 for (int i = 0; i != nitems_; i++) {
965 items_[i].revents = 0;
966
967 // The poll item is a 0MQ socket. Retrieve pending events
968 // using the ZMQ_EVENTS socket option.
969 if (items_[i].socket) {
970 size_t zmq_events_size = sizeof (uint32_t);
971 uint32_t zmq_events;
972 if (zmq_getsockopt (items_[i].socket, ZMQ_EVENTS, &zmq_events,
973 &zmq_events_size)
974 == -1) {
975 return -1;
976 }
977 if ((items_[i].events & ZMQ_POLLOUT)
978 && (zmq_events & ZMQ_POLLOUT))
979 items_[i].revents |= ZMQ_POLLOUT;
980 if ((items_[i].events & ZMQ_POLLIN)
981 && (zmq_events & ZMQ_POLLIN))
982 items_[i].revents |= ZMQ_POLLIN;
983 }
984 // Else, the poll item is a raw file descriptor, simply convert
985 // the events to zmq_pollitem_t-style format.
986 else {
987 if (pollfds[i].revents & POLLIN)
988 items_[i].revents |= ZMQ_POLLIN;
989 if (pollfds[i].revents & POLLOUT)
990 items_[i].revents |= ZMQ_POLLOUT;
991 if (pollfds[i].revents & POLLPRI)
992 items_[i].revents |= ZMQ_POLLPRI;
993 if (pollfds[i].revents & ~(POLLIN | POLLOUT | POLLPRI))
994 items_[i].revents |= ZMQ_POLLERR;
995 }
996
997 if (items_[i].revents)
998 nevents++;
999 }
1000
1001#else
1002
1003 // Compute the timeout for the subsequent poll.
1004 timeval timeout;
1005 timeval *ptimeout;
1006 if (first_pass) {
1007 timeout.tv_sec = 0;
1008 timeout.tv_usec = 0;
1009 ptimeout = &timeout;
1010 } else if (timeout_ < 0)
1011 ptimeout = NULL;
1012 else {
1013 timeout.tv_sec = static_cast<long> ((end - now) / 1000);
1014 timeout.tv_usec = static_cast<long> ((end - now) % 1000 * 1000);
1015 ptimeout = &timeout;
1016 }
1017
1018 // Wait for events. Ignore interrupts if there's infinite timeout.
1019 while (true) {
1020 memcpy (inset.get (), pollset_in.get (),
1021 zmq::valid_pollset_bytes (*pollset_in.get ()));
1022 memcpy (outset.get (), pollset_out.get (),
1023 zmq::valid_pollset_bytes (*pollset_out.get ()));
1024 memcpy (errset.get (), pollset_err.get (),
1025 zmq::valid_pollset_bytes (*pollset_err.get ()));
1026#if defined ZMQ_HAVE_WINDOWS
1027 int rc =
1028 select (0, inset.get (), outset.get (), errset.get (), ptimeout);
1029 if (unlikely (rc == SOCKET_ERROR)) {
1030 errno = zmq::wsa_error_to_errno (WSAGetLastError ());
1031 wsa_assert (errno == ENOTSOCK);
1032 return -1;
1033 }
1034#else
1035 int rc = select (maxfd + 1, inset.get (), outset.get (),
1036 errset.get (), ptimeout);
1037 if (unlikely (rc == -1)) {
1038 errno_assert (errno == EINTR || errno == EBADF);
1039 return -1;
1040 }
1041#endif
1042 break;
1043 }
1044
1045 // Check for the events.
1046 for (int i = 0; i != nitems_; i++) {
1047 items_[i].revents = 0;
1048
1049 // The poll item is a 0MQ socket. Retrieve pending events
1050 // using the ZMQ_EVENTS socket option.
1051 if (items_[i].socket) {
1052 size_t zmq_events_size = sizeof (uint32_t);
1053 uint32_t zmq_events;
1054 if (zmq_getsockopt (items_[i].socket, ZMQ_EVENTS, &zmq_events,
1055 &zmq_events_size)
1056 == -1)
1057 return -1;
1058 if ((items_[i].events & ZMQ_POLLOUT)
1059 && (zmq_events & ZMQ_POLLOUT))
1060 items_[i].revents |= ZMQ_POLLOUT;
1061 if ((items_[i].events & ZMQ_POLLIN)
1062 && (zmq_events & ZMQ_POLLIN))
1063 items_[i].revents |= ZMQ_POLLIN;
1064 }
1065 // Else, the poll item is a raw file descriptor, simply convert
1066 // the events to zmq_pollitem_t-style format.
1067 else {
1068 if (FD_ISSET (items_[i].fd, inset.get ()))
1069 items_[i].revents |= ZMQ_POLLIN;
1070 if (FD_ISSET (items_[i].fd, outset.get ()))
1071 items_[i].revents |= ZMQ_POLLOUT;
1072 if (FD_ISSET (items_[i].fd, errset.get ()))
1073 items_[i].revents |= ZMQ_POLLERR;
1074 }
1075
1076 if (items_[i].revents)
1077 nevents++;
1078 }
1079#endif
1080
1081 // If timeout is zero, exit immediately whether there are events or not.
1082 if (timeout_ == 0)
1083 break;
1084
1085 // If there are events to return, we can exit immediately.
1086 if (nevents)
1087 break;
1088
1089 // At this point we are meant to wait for events but there are none.
1090 // If timeout is infinite we can just loop until we get some events.
1091 if (timeout_ < 0) {
1092 if (first_pass)
1093 first_pass = false;
1094 continue;
1095 }
1096
1097 // The timeout is finite and there are no events. In the first pass
1098 // we get a timestamp of when the polling have begun. (We assume that
1099 // first pass have taken negligible time). We also compute the time
1100 // when the polling should time out.
1101 if (first_pass) {
1102 now = clock.now_ms ();
1103 end = now + timeout_;
1104 if (now == end)
1105 break;
1106 first_pass = false;
1107 continue;
1108 }
1109
1110 // Find out whether timeout have expired.
1111 now = clock.now_ms ();
1112 if (now >= end)
1113 break;
1114 }
1115
1116 return nevents;
1117#else
1118 // Exotic platforms that support neither poll() nor select().
1119 errno = ENOTSUP;
1120 return -1;
1121#endif
1122}
1123
1124// The poller functionality
1125
1126void *zmq_poller_new (void)
1127{
1128 zmq::socket_poller_t *poller = new (std::nothrow) zmq::socket_poller_t;
1129 if (!poller) {
1130 errno = ENOMEM;
1131 }
1132 return poller;
1133}
1134
1135int zmq_poller_destroy (void **poller_p_)
1136{
1137 if (poller_p_) {
1138 zmq::socket_poller_t *const poller =
1139 static_cast<zmq::socket_poller_t *> (*poller_p_);
1140 if (poller && poller->check_tag ()) {
1141 delete poller;
1142 *poller_p_ = NULL;
1143 return 0;
1144 }
1145 }
1146 errno = EFAULT;
1147 return -1;
1148}
1149
1150
1151static int check_poller (void *const poller_)
1152{
1153 if (!poller_
1154 || !(static_cast<zmq::socket_poller_t *> (poller_))->check_tag ()) {
1155 errno = EFAULT;
1156 return -1;
1157 }
1158
1159 return 0;
1160}
1161
1162static int check_events (const short events_)
1163{
1164 if (events_ & ~(ZMQ_POLLIN | ZMQ_POLLOUT | ZMQ_POLLERR | ZMQ_POLLPRI)) {
1165 errno = EINVAL;
1166 return -1;
1167 }
1168 return 0;
1169}
1170
1171static int check_poller_registration_args (void *const poller_, void *const s_)
1172{
1173 if (-1 == check_poller (poller_))
1174 return -1;
1175
1176 if (!s_ || !(static_cast<zmq::socket_base_t *> (s_))->check_tag ()) {
1177 errno = ENOTSOCK;
1178 return -1;
1179 }
1180
1181 return 0;
1182}
1183
1184static int check_poller_fd_registration_args (void *const poller_,
1185 const zmq::fd_t fd_)
1186{
1187 if (-1 == check_poller (poller_))
1188 return -1;
1189
1190 if (fd_ == zmq::retired_fd) {
1191 errno = EBADF;
1192 return -1;
1193 }
1194
1195 return 0;
1196}
1197
1198int zmq_poller_add (void *poller_, void *s_, void *user_data_, short events_)
1199{
1200 if (-1 == check_poller_registration_args (poller_, s_)
1201 || -1 == check_events (events_))
1202 return -1;
1203
1204 zmq::socket_base_t *socket = static_cast<zmq::socket_base_t *> (s_);
1205
1206 return (static_cast<zmq::socket_poller_t *> (poller_))
1207 ->add (socket, user_data_, events_);
1208}
1209
1210int zmq_poller_add_fd (void *poller_,
1211 zmq::fd_t fd_,
1212 void *user_data_,
1213 short events_)
1214{
1215 if (-1 == check_poller_fd_registration_args (poller_, fd_)
1216 || -1 == check_events (events_))
1217 return -1;
1218
1219 return (static_cast<zmq::socket_poller_t *> (poller_))
1220 ->add_fd (fd_, user_data_, events_);
1221}
1222
1223
1224int zmq_poller_modify (void *poller_, void *s_, short events_)
1225{
1226 if (-1 == check_poller_registration_args (poller_, s_)
1227 || -1 == check_events (events_))
1228 return -1;
1229
1230 zmq::socket_base_t *socket = static_cast<zmq::socket_base_t *> (s_);
1231
1232 return (static_cast<zmq::socket_poller_t *> (poller_))
1233 ->modify (socket, events_);
1234}
1235
1236int zmq_poller_modify_fd (void *poller_, zmq::fd_t fd_, short events_)
1237{
1238 if (-1 == check_poller_fd_registration_args (poller_, fd_)
1239 || -1 == check_events (events_))
1240 return -1;
1241
1242 return (static_cast<zmq::socket_poller_t *> (poller_))
1243 ->modify_fd (fd_, events_);
1244}
1245
1246int zmq_poller_remove (void *poller_, void *s_)
1247{
1248 if (-1 == check_poller_registration_args (poller_, s_))
1249 return -1;
1250
1251 zmq::socket_base_t *socket = static_cast<zmq::socket_base_t *> (s_);
1252
1253 return (static_cast<zmq::socket_poller_t *> (poller_))->remove (socket);
1254}
1255
1256int zmq_poller_remove_fd (void *poller_, zmq::fd_t fd_)
1257{
1258 if (-1 == check_poller_fd_registration_args (poller_, fd_))
1259 return -1;
1260
1261 return (static_cast<zmq::socket_poller_t *> (poller_))->remove_fd (fd_);
1262}
1263
1264int zmq_poller_wait (void *poller_, zmq_poller_event_t *event_, long timeout_)
1265{
1266 int rc = zmq_poller_wait_all (poller_, event_, 1, timeout_);
1267
1268 if (rc < 0 && event_) {
1269 // TODO this is not portable... zmq_poller_event_t contains pointers,
1270 // for which nullptr does not need to be represented by all-zeroes
1271 memset (event_, 0, sizeof (zmq_poller_event_t));
1272 }
1273 // wait_all returns number of events, but we return 0 for any success
1274 return rc >= 0 ? 0 : rc;
1275}
1276
1277int zmq_poller_wait_all (void *poller_,
1278 zmq_poller_event_t *events_,
1279 int n_events_,
1280 long timeout_)
1281{
1282 if (-1 == check_poller (poller_))
1283 return -1;
1284
1285 if (!events_) {
1286 errno = EFAULT;
1287 return -1;
1288 }
1289 if (n_events_ < 0) {
1290 errno = EINVAL;
1291 return -1;
1292 }
1293
1294 int rc =
1295 (static_cast<zmq::socket_poller_t *> (poller_))
1296 ->wait (reinterpret_cast<zmq::socket_poller_t::event_t *> (events_),
1297 n_events_, timeout_);
1298
1299 return rc;
1300}
1301
1302int zmq_poller_fd (void *poller_, zmq_fd_t *fd_)
1303{
1304 if (!poller_
1305 || !(static_cast<zmq::socket_poller_t *> (poller_)->check_tag ())) {
1306 errno = EFAULT;
1307 return -1;
1308 }
1309 return static_cast<zmq::socket_poller_t *> (poller_)->signaler_fd (fd_);
1310}
1311
1312// Peer-specific state
1313
1314int zmq_socket_get_peer_state (void *s_,
1315 const void *routing_id_,
1316 size_t routing_id_size_)
1317{
1318 const zmq::socket_base_t *const s = as_socket_base_t (s_);
1319 if (!s)
1320 return -1;
1321
1322 return s->get_peer_state (routing_id_, routing_id_size_);
1323}
1324
1325// Timers
1326
1327void *zmq_timers_new (void)
1328{
1329 zmq::timers_t *timers = new (std::nothrow) zmq::timers_t;
1330 alloc_assert (timers);
1331 return timers;
1332}
1333
1334int zmq_timers_destroy (void **timers_p_)
1335{
1336 void *timers = *timers_p_;
1337 if (!timers || !(static_cast<zmq::timers_t *> (timers))->check_tag ()) {
1338 errno = EFAULT;
1339 return -1;
1340 }
1341 delete (static_cast<zmq::timers_t *> (timers));
1342 *timers_p_ = NULL;
1343 return 0;
1344}
1345
1346int zmq_timers_add (void *timers_,
1347 size_t interval_,
1348 zmq_timer_fn handler_,
1349 void *arg_)
1350{
1351 if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
1352 errno = EFAULT;
1353 return -1;
1354 }
1355
1356 return (static_cast<zmq::timers_t *> (timers_))
1357 ->add (interval_, handler_, arg_);
1358}
1359
1360int zmq_timers_cancel (void *timers_, int timer_id_)
1361{
1362 if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
1363 errno = EFAULT;
1364 return -1;
1365 }
1366
1367 return (static_cast<zmq::timers_t *> (timers_))->cancel (timer_id_);
1368}
1369
1370int zmq_timers_set_interval (void *timers_, int timer_id_, size_t interval_)
1371{
1372 if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
1373 errno = EFAULT;
1374 return -1;
1375 }
1376
1377 return (static_cast<zmq::timers_t *> (timers_))
1378 ->set_interval (timer_id_, interval_);
1379}
1380
1381int zmq_timers_reset (void *timers_, int timer_id_)
1382{
1383 if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
1384 errno = EFAULT;
1385 return -1;
1386 }
1387
1388 return (static_cast<zmq::timers_t *> (timers_))->reset (timer_id_);
1389}
1390
1391long zmq_timers_timeout (void *timers_)
1392{
1393 if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
1394 errno = EFAULT;
1395 return -1;
1396 }
1397
1398 return (static_cast<zmq::timers_t *> (timers_))->timeout ();
1399}
1400
1401int zmq_timers_execute (void *timers_)
1402{
1403 if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) {
1404 errno = EFAULT;
1405 return -1;
1406 }
1407
1408 return (static_cast<zmq::timers_t *> (timers_))->execute ();
1409}
1410
1411// The proxy functionality
1412
1413int zmq_proxy (void *frontend_, void *backend_, void *capture_)
1414{
1415 if (!frontend_ || !backend_) {
1416 errno = EFAULT;
1417 return -1;
1418 }
1419 return zmq::proxy (static_cast<zmq::socket_base_t *> (frontend_),
1420 static_cast<zmq::socket_base_t *> (backend_),
1421 static_cast<zmq::socket_base_t *> (capture_));
1422}
1423
1424int zmq_proxy_steerable (void *frontend_,
1425 void *backend_,
1426 void *capture_,
1427 void *control_)
1428{
1429 if (!frontend_ || !backend_) {
1430 errno = EFAULT;
1431 return -1;
1432 }
1433 return zmq::proxy (static_cast<zmq::socket_base_t *> (frontend_),
1434 static_cast<zmq::socket_base_t *> (backend_),
1435 static_cast<zmq::socket_base_t *> (capture_),
1436 static_cast<zmq::socket_base_t *> (control_));
1437}
1438
1439// The deprecated device functionality
1440
1441int zmq_device (int /* type */, void *frontend_, void *backend_)
1442{
1443 return zmq::proxy (static_cast<zmq::socket_base_t *> (frontend_),
1444 static_cast<zmq::socket_base_t *> (backend_), NULL);
1445}
1446
1447// Probe library capabilities; for now, reports on transport and security
1448
1449int zmq_has (const char *capability_)
1450{
1451#if defined(ZMQ_HAVE_IPC)
1452 if (strcmp (capability_, zmq::protocol_name::ipc) == 0)
1453 return true;
1454#endif
1455#if defined(ZMQ_HAVE_OPENPGM)
1456 if (strcmp (capability_, "pgm") == 0)
1457 return true;
1458#endif
1459#if defined(ZMQ_HAVE_TIPC)
1460 if (strcmp (capability_, zmq::protocol_name::tipc) == 0)
1461 return true;
1462#endif
1463#if defined(ZMQ_HAVE_NORM)
1464 if (strcmp (capability_, "norm") == 0)
1465 return true;
1466#endif
1467#if defined(ZMQ_HAVE_CURVE)
1468 if (strcmp (capability_, "curve") == 0)
1469 return true;
1470#endif
1471#if defined(HAVE_LIBGSSAPI_KRB5)
1472 if (strcmp (capability_, "gssapi") == 0)
1473 return true;
1474#endif
1475#if defined(ZMQ_HAVE_VMCI)
1476 if (strcmp (capability_, zmq::protocol_name::vmci) == 0)
1477 return true;
1478#endif
1479#if defined(ZMQ_BUILD_DRAFT_API)
1480 if (strcmp (capability_, "draft") == 0)
1481 return true;
1482#endif
1483#if defined(ZMQ_HAVE_WS)
1484 if (strcmp (capability_, "WS") == 0)
1485 return true;
1486#endif
1487#if defined(ZMQ_HAVE_WSS)
1488 if (strcmp (capability_, "WSS") == 0)
1489 return true;
1490#endif
1491 // Whatever the application asked for, we don't have
1492 return false;
1493}
1494
1495int zmq_socket_monitor_pipes_stats (void *s_)
1496{
1497 zmq::socket_base_t *s = as_socket_base_t (s_);
1498 if (!s)
1499 return -1;
1500 return s->query_pipes_stats ();
1501}
1502