1 | /* |
2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | // "Tell them I was a writer. |
31 | // A maker of software. |
32 | // A humanist. A father. |
33 | // And many things. |
34 | // But above all, a writer. |
35 | // Thank You. :)" |
36 | // - Pieter Hintjens |
37 | |
38 | #include "precompiled.hpp" |
39 | #define ZMQ_TYPE_UNSAFE |
40 | |
41 | #include "macros.hpp" |
42 | #include "poller.hpp" |
43 | |
44 | #if !defined ZMQ_HAVE_POLLER |
45 | // On AIX platform, poll.h has to be included first to get consistent |
46 | // definition of pollfd structure (AIX uses 'reqevents' and 'retnevents' |
47 | // instead of 'events' and 'revents' and defines macros to map from POSIX-y |
48 | // names to AIX-specific names). |
49 | #if defined ZMQ_POLL_BASED_ON_POLL && !defined ZMQ_HAVE_WINDOWS |
50 | #include <poll.h> |
51 | #endif |
52 | |
53 | #include "polling_util.hpp" |
54 | #endif |
55 | |
56 | // TODO: determine if this is an issue, since zmq.h is being loaded from pch. |
57 | // zmq.h must be included *after* poll.h for AIX to build properly |
58 | //#include "../include/zmq.h" |
59 | |
60 | #if !defined ZMQ_HAVE_WINDOWS |
61 | #include <unistd.h> |
62 | #ifdef ZMQ_HAVE_VXWORKS |
63 | #include <strings.h> |
64 | #endif |
65 | #endif |
66 | |
67 | // XSI vector I/O |
68 | #if defined ZMQ_HAVE_UIO |
69 | #include <sys/uio.h> |
70 | #else |
71 | struct iovec |
72 | { |
73 | void *iov_base; |
74 | size_t iov_len; |
75 | }; |
76 | #endif |
77 | |
78 | #include <string.h> |
79 | #include <stdlib.h> |
80 | #include <new> |
81 | #include <climits> |
82 | |
83 | #include "proxy.hpp" |
84 | #include "socket_base.hpp" |
85 | #include "stdint.hpp" |
86 | #include "config.hpp" |
87 | #include "likely.hpp" |
88 | #include "clock.hpp" |
89 | #include "ctx.hpp" |
90 | #include "err.hpp" |
91 | #include "msg.hpp" |
92 | #include "fd.hpp" |
93 | #include "metadata.hpp" |
94 | #include "socket_poller.hpp" |
95 | #include "timers.hpp" |
96 | #include "ip.hpp" |
97 | #include "address.hpp" |
98 | |
99 | #if defined ZMQ_HAVE_OPENPGM |
100 | #define __PGM_WININT_H__ |
101 | #include <pgm/pgm.h> |
102 | #endif |
103 | |
104 | // Compile time check whether msg_t fits into zmq_msg_t. |
105 | typedef char |
106 | check_msg_t_size[sizeof (zmq::msg_t) == sizeof (zmq_msg_t) ? 1 : -1]; |
107 | |
108 | |
109 | void zmq_version (int *major_, int *minor_, int *patch_) |
110 | { |
111 | *major_ = ZMQ_VERSION_MAJOR; |
112 | *minor_ = ZMQ_VERSION_MINOR; |
113 | *patch_ = ZMQ_VERSION_PATCH; |
114 | } |
115 | |
116 | |
117 | const char *zmq_strerror (int errnum_) |
118 | { |
119 | return zmq::errno_to_string (errnum_); |
120 | } |
121 | |
122 | int zmq_errno (void) |
123 | { |
124 | return errno; |
125 | } |
126 | |
127 | |
128 | // New context API |
129 | |
130 | void *zmq_ctx_new (void) |
131 | { |
132 | // We do this before the ctx constructor since its embedded mailbox_t |
133 | // object needs the network to be up and running (at least on Windows). |
134 | if (!zmq::initialize_network ()) { |
135 | return NULL; |
136 | } |
137 | |
138 | // Create 0MQ context. |
139 | zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t; |
140 | if (ctx) { |
141 | if (!ctx->valid ()) { |
142 | delete ctx; |
143 | return NULL; |
144 | } |
145 | } |
146 | return ctx; |
147 | } |
148 | |
149 | int zmq_ctx_term (void *ctx_) |
150 | { |
151 | if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) { |
152 | errno = EFAULT; |
153 | return -1; |
154 | } |
155 | |
156 | int rc = (static_cast<zmq::ctx_t *> (ctx_))->terminate (); |
157 | int en = errno; |
158 | |
159 | // Shut down only if termination was not interrupted by a signal. |
160 | if (!rc || en != EINTR) { |
161 | zmq::shutdown_network (); |
162 | } |
163 | |
164 | errno = en; |
165 | return rc; |
166 | } |
167 | |
168 | int zmq_ctx_shutdown (void *ctx_) |
169 | { |
170 | if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) { |
171 | errno = EFAULT; |
172 | return -1; |
173 | } |
174 | return (static_cast<zmq::ctx_t *> (ctx_))->shutdown (); |
175 | } |
176 | |
177 | int zmq_ctx_set (void *ctx_, int option_, int optval_) |
178 | { |
179 | return zmq_ctx_set_ext (ctx_, option_, &optval_, sizeof (int)); |
180 | } |
181 | |
182 | int zmq_ctx_set_ext (void *ctx_, |
183 | int option_, |
184 | const void *optval_, |
185 | size_t optvallen_) |
186 | { |
187 | if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) { |
188 | errno = EFAULT; |
189 | return -1; |
190 | } |
191 | return (static_cast<zmq::ctx_t *> (ctx_)) |
192 | ->set (option_, optval_, optvallen_); |
193 | } |
194 | |
195 | int zmq_ctx_get (void *ctx_, int option_) |
196 | { |
197 | int optval = 0; |
198 | size_t optvallen = sizeof (int); |
199 | if (zmq_ctx_get_ext (ctx_, option_, &optval, &optvallen) == 0) { |
200 | return optval; |
201 | } |
202 | |
203 | errno = EFAULT; |
204 | return -1; |
205 | } |
206 | |
207 | int zmq_ctx_get_ext (void *ctx_, int option_, void *optval_, size_t *optvallen_) |
208 | { |
209 | if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) { |
210 | errno = EFAULT; |
211 | return -1; |
212 | } |
213 | return (static_cast<zmq::ctx_t *> (ctx_)) |
214 | ->get (option_, optval_, optvallen_); |
215 | } |
216 | |
217 | |
218 | // Stable/legacy context API |
219 | |
220 | void *zmq_init (int io_threads_) |
221 | { |
222 | if (io_threads_ >= 0) { |
223 | void *ctx = zmq_ctx_new (); |
224 | zmq_ctx_set (ctx, ZMQ_IO_THREADS, io_threads_); |
225 | return ctx; |
226 | } |
227 | errno = EINVAL; |
228 | return NULL; |
229 | } |
230 | |
231 | int zmq_term (void *ctx_) |
232 | { |
233 | return zmq_ctx_term (ctx_); |
234 | } |
235 | |
236 | int zmq_ctx_destroy (void *ctx_) |
237 | { |
238 | return zmq_ctx_term (ctx_); |
239 | } |
240 | |
241 | |
242 | // Sockets |
243 | |
244 | static zmq::socket_base_t *as_socket_base_t (void *s_) |
245 | { |
246 | zmq::socket_base_t *s = static_cast<zmq::socket_base_t *> (s_); |
247 | if (!s_ || !s->check_tag ()) { |
248 | errno = ENOTSOCK; |
249 | return NULL; |
250 | } |
251 | return s; |
252 | } |
253 | |
254 | void *zmq_socket (void *ctx_, int type_) |
255 | { |
256 | if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) { |
257 | errno = EFAULT; |
258 | return NULL; |
259 | } |
260 | zmq::ctx_t *ctx = static_cast<zmq::ctx_t *> (ctx_); |
261 | zmq::socket_base_t *s = ctx->create_socket (type_); |
262 | return (void *) s; |
263 | } |
264 | |
265 | int zmq_close (void *s_) |
266 | { |
267 | zmq::socket_base_t *s = as_socket_base_t (s_); |
268 | if (!s) |
269 | return -1; |
270 | s->close (); |
271 | return 0; |
272 | } |
273 | |
274 | int zmq_setsockopt (void *s_, |
275 | int option_, |
276 | const void *optval_, |
277 | size_t optvallen_) |
278 | { |
279 | zmq::socket_base_t *s = as_socket_base_t (s_); |
280 | if (!s) |
281 | return -1; |
282 | return s->setsockopt (option_, optval_, optvallen_); |
283 | } |
284 | |
285 | int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_) |
286 | { |
287 | zmq::socket_base_t *s = as_socket_base_t (s_); |
288 | if (!s) |
289 | return -1; |
290 | return s->getsockopt (option_, optval_, optvallen_); |
291 | } |
292 | |
293 | int zmq_socket_monitor_versioned ( |
294 | void *s_, const char *addr_, uint64_t events_, int event_version_, int type_) |
295 | { |
296 | zmq::socket_base_t *s = as_socket_base_t (s_); |
297 | if (!s) |
298 | return -1; |
299 | return s->monitor (addr_, events_, event_version_, type_); |
300 | } |
301 | |
302 | int zmq_socket_monitor (void *s_, const char *addr_, int events_) |
303 | { |
304 | return zmq_socket_monitor_versioned (s_, addr_, events_, 1, ZMQ_PAIR); |
305 | } |
306 | |
307 | int zmq_join (void *s_, const char *group_) |
308 | { |
309 | zmq::socket_base_t *s = as_socket_base_t (s_); |
310 | if (!s) |
311 | return -1; |
312 | return s->join (group_); |
313 | } |
314 | |
315 | int zmq_leave (void *s_, const char *group_) |
316 | { |
317 | zmq::socket_base_t *s = as_socket_base_t (s_); |
318 | if (!s) |
319 | return -1; |
320 | return s->leave (group_); |
321 | } |
322 | |
323 | int zmq_bind (void *s_, const char *addr_) |
324 | { |
325 | zmq::socket_base_t *s = as_socket_base_t (s_); |
326 | if (!s) |
327 | return -1; |
328 | return s->bind (addr_); |
329 | } |
330 | |
331 | int zmq_connect (void *s_, const char *addr_) |
332 | { |
333 | zmq::socket_base_t *s = as_socket_base_t (s_); |
334 | if (!s) |
335 | return -1; |
336 | return s->connect (addr_); |
337 | } |
338 | |
339 | int zmq_unbind (void *s_, const char *addr_) |
340 | { |
341 | zmq::socket_base_t *s = as_socket_base_t (s_); |
342 | if (!s) |
343 | return -1; |
344 | return s->term_endpoint (addr_); |
345 | } |
346 | |
347 | int zmq_disconnect (void *s_, const char *addr_) |
348 | { |
349 | zmq::socket_base_t *s = as_socket_base_t (s_); |
350 | if (!s) |
351 | return -1; |
352 | return s->term_endpoint (addr_); |
353 | } |
354 | |
355 | // Sending functions. |
356 | |
357 | static inline int |
358 | s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_) |
359 | { |
360 | size_t sz = zmq_msg_size (msg_); |
361 | int rc = s_->send (reinterpret_cast<zmq::msg_t *> (msg_), flags_); |
362 | if (unlikely (rc < 0)) |
363 | return -1; |
364 | |
365 | // This is what I'd like to do, my C++ fu is too weak -- PH 2016/02/09 |
366 | // int max_msgsz = s_->parent->get (ZMQ_MAX_MSGSZ); |
367 | size_t max_msgsz = INT_MAX; |
368 | |
369 | // Truncate returned size to INT_MAX to avoid overflow to negative values |
370 | return static_cast<int> (sz < max_msgsz ? sz : max_msgsz); |
371 | } |
372 | |
373 | /* To be deprecated once zmq_msg_send() is stable */ |
374 | int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_) |
375 | { |
376 | return zmq_msg_send (msg_, s_, flags_); |
377 | } |
378 | |
379 | int zmq_send (void *s_, const void *buf_, size_t len_, int flags_) |
380 | { |
381 | zmq::socket_base_t *s = as_socket_base_t (s_); |
382 | if (!s) |
383 | return -1; |
384 | zmq_msg_t msg; |
385 | if (zmq_msg_init_size (&msg, len_)) |
386 | return -1; |
387 | |
388 | // We explicitly allow a send from NULL, size zero |
389 | if (len_) { |
390 | assert (buf_); |
391 | memcpy (zmq_msg_data (&msg), buf_, len_); |
392 | } |
393 | int rc = s_sendmsg (s, &msg, flags_); |
394 | if (unlikely (rc < 0)) { |
395 | int err = errno; |
396 | int rc2 = zmq_msg_close (&msg); |
397 | errno_assert (rc2 == 0); |
398 | errno = err; |
399 | return -1; |
400 | } |
401 | // Note the optimisation here. We don't close the msg object as it is |
402 | // empty anyway. This may change when implementation of zmq_msg_t changes. |
403 | return rc; |
404 | } |
405 | |
406 | int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_) |
407 | { |
408 | zmq::socket_base_t *s = as_socket_base_t (s_); |
409 | if (!s) |
410 | return -1; |
411 | zmq_msg_t msg; |
412 | int rc = |
413 | zmq_msg_init_data (&msg, const_cast<void *> (buf_), len_, NULL, NULL); |
414 | if (rc != 0) |
415 | return -1; |
416 | |
417 | rc = s_sendmsg (s, &msg, flags_); |
418 | if (unlikely (rc < 0)) { |
419 | int err = errno; |
420 | int rc2 = zmq_msg_close (&msg); |
421 | errno_assert (rc2 == 0); |
422 | errno = err; |
423 | return -1; |
424 | } |
425 | // Note the optimisation here. We don't close the msg object as it is |
426 | // empty anyway. This may change when implementation of zmq_msg_t changes. |
427 | return rc; |
428 | } |
429 | |
430 | |
431 | // Send multiple messages. |
432 | // TODO: this function has no man page |
433 | // |
434 | // If flag bit ZMQ_SNDMORE is set the vector is treated as |
435 | // a single multi-part message, i.e. the last message has |
436 | // ZMQ_SNDMORE bit switched off. |
437 | // |
438 | int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_) |
439 | { |
440 | zmq::socket_base_t *s = as_socket_base_t (s_); |
441 | if (!s) |
442 | return -1; |
443 | if (unlikely (count_ <= 0 || !a_)) { |
444 | errno = EINVAL; |
445 | return -1; |
446 | } |
447 | |
448 | int rc = 0; |
449 | zmq_msg_t msg; |
450 | |
451 | for (size_t i = 0; i < count_; ++i) { |
452 | rc = zmq_msg_init_size (&msg, a_[i].iov_len); |
453 | if (rc != 0) { |
454 | rc = -1; |
455 | break; |
456 | } |
457 | memcpy (zmq_msg_data (&msg), a_[i].iov_base, a_[i].iov_len); |
458 | if (i == count_ - 1) |
459 | flags_ = flags_ & ~ZMQ_SNDMORE; |
460 | rc = s_sendmsg (s, &msg, flags_); |
461 | if (unlikely (rc < 0)) { |
462 | int err = errno; |
463 | int rc2 = zmq_msg_close (&msg); |
464 | errno_assert (rc2 == 0); |
465 | errno = err; |
466 | rc = -1; |
467 | break; |
468 | } |
469 | } |
470 | return rc; |
471 | } |
472 | |
473 | // Receiving functions. |
474 | |
475 | static int s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_) |
476 | { |
477 | int rc = s_->recv (reinterpret_cast<zmq::msg_t *> (msg_), flags_); |
478 | if (unlikely (rc < 0)) |
479 | return -1; |
480 | |
481 | // Truncate returned size to INT_MAX to avoid overflow to negative values |
482 | size_t sz = zmq_msg_size (msg_); |
483 | return static_cast<int> (sz < INT_MAX ? sz : INT_MAX); |
484 | } |
485 | |
486 | /* To be deprecated once zmq_msg_recv() is stable */ |
487 | int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_) |
488 | { |
489 | return zmq_msg_recv (msg_, s_, flags_); |
490 | } |
491 | |
492 | |
493 | int zmq_recv (void *s_, void *buf_, size_t len_, int flags_) |
494 | { |
495 | zmq::socket_base_t *s = as_socket_base_t (s_); |
496 | if (!s) |
497 | return -1; |
498 | zmq_msg_t msg; |
499 | int rc = zmq_msg_init (&msg); |
500 | errno_assert (rc == 0); |
501 | |
502 | int nbytes = s_recvmsg (s, &msg, flags_); |
503 | if (unlikely (nbytes < 0)) { |
504 | int err = errno; |
505 | rc = zmq_msg_close (&msg); |
506 | errno_assert (rc == 0); |
507 | errno = err; |
508 | return -1; |
509 | } |
510 | |
511 | // An oversized message is silently truncated. |
512 | size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_; |
513 | |
514 | // We explicitly allow a null buffer argument if len is zero |
515 | if (to_copy) { |
516 | assert (buf_); |
517 | memcpy (buf_, zmq_msg_data (&msg), to_copy); |
518 | } |
519 | rc = zmq_msg_close (&msg); |
520 | errno_assert (rc == 0); |
521 | |
522 | return nbytes; |
523 | } |
524 | |
525 | // Receive a multi-part message |
526 | // |
527 | // Receives up to *count_ parts of a multi-part message. |
528 | // Sets *count_ to the actual number of parts read. |
529 | // ZMQ_RCVMORE is set to indicate if a complete multi-part message was read. |
530 | // Returns number of message parts read, or -1 on error. |
531 | // |
532 | // Note: even if -1 is returned, some parts of the message |
533 | // may have been read. Therefore the client must consult |
534 | // *count_ to retrieve message parts successfully read, |
535 | // even if -1 is returned. |
536 | // |
537 | // The iov_base* buffers of each iovec *a_ filled in by this |
538 | // function may be freed using free(). |
539 | // TODO: this function has no man page |
540 | // |
541 | int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_) |
542 | { |
543 | zmq::socket_base_t *s = as_socket_base_t (s_); |
544 | if (!s) |
545 | return -1; |
546 | if (unlikely (!count_ || *count_ <= 0 || !a_)) { |
547 | errno = EINVAL; |
548 | return -1; |
549 | } |
550 | |
551 | size_t count = *count_; |
552 | int nread = 0; |
553 | bool recvmore = true; |
554 | |
555 | *count_ = 0; |
556 | |
557 | for (size_t i = 0; recvmore && i < count; ++i) { |
558 | zmq_msg_t msg; |
559 | int rc = zmq_msg_init (&msg); |
560 | errno_assert (rc == 0); |
561 | |
562 | int nbytes = s_recvmsg (s, &msg, flags_); |
563 | if (unlikely (nbytes < 0)) { |
564 | int err = errno; |
565 | rc = zmq_msg_close (&msg); |
566 | errno_assert (rc == 0); |
567 | errno = err; |
568 | nread = -1; |
569 | break; |
570 | } |
571 | |
572 | a_[i].iov_len = zmq_msg_size (&msg); |
573 | a_[i].iov_base = static_cast<char *> (malloc (a_[i].iov_len)); |
574 | if (unlikely (!a_[i].iov_base)) { |
575 | errno = ENOMEM; |
576 | return -1; |
577 | } |
578 | memcpy (a_[i].iov_base, static_cast<char *> (zmq_msg_data (&msg)), |
579 | a_[i].iov_len); |
580 | // Assume zmq_socket ZMQ_RVCMORE is properly set. |
581 | const zmq::msg_t *p_msg = reinterpret_cast<const zmq::msg_t *> (&msg); |
582 | recvmore = p_msg->flags () & zmq::msg_t::more; |
583 | rc = zmq_msg_close (&msg); |
584 | errno_assert (rc == 0); |
585 | ++*count_; |
586 | ++nread; |
587 | } |
588 | return nread; |
589 | } |
590 | |
591 | // Message manipulators. |
592 | |
593 | int zmq_msg_init (zmq_msg_t *msg_) |
594 | { |
595 | return (reinterpret_cast<zmq::msg_t *> (msg_))->init (); |
596 | } |
597 | |
598 | int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_) |
599 | { |
600 | return (reinterpret_cast<zmq::msg_t *> (msg_))->init_size (size_); |
601 | } |
602 | |
603 | int zmq_msg_init_data ( |
604 | zmq_msg_t *msg_, void *data_, size_t size_, zmq_free_fn *ffn_, void *hint_) |
605 | { |
606 | return (reinterpret_cast<zmq::msg_t *> (msg_)) |
607 | ->init_data (data_, size_, ffn_, hint_); |
608 | } |
609 | |
610 | int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_) |
611 | { |
612 | zmq::socket_base_t *s = as_socket_base_t (s_); |
613 | if (!s) |
614 | return -1; |
615 | return s_sendmsg (s, msg_, flags_); |
616 | } |
617 | |
618 | int zmq_msg_recv (zmq_msg_t *msg_, void *s_, int flags_) |
619 | { |
620 | zmq::socket_base_t *s = as_socket_base_t (s_); |
621 | if (!s) |
622 | return -1; |
623 | return s_recvmsg (s, msg_, flags_); |
624 | } |
625 | |
626 | int zmq_msg_close (zmq_msg_t *msg_) |
627 | { |
628 | return (reinterpret_cast<zmq::msg_t *> (msg_))->close (); |
629 | } |
630 | |
631 | int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_) |
632 | { |
633 | return (reinterpret_cast<zmq::msg_t *> (dest_)) |
634 | ->move (*reinterpret_cast<zmq::msg_t *> (src_)); |
635 | } |
636 | |
637 | int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_) |
638 | { |
639 | return (reinterpret_cast<zmq::msg_t *> (dest_)) |
640 | ->copy (*reinterpret_cast<zmq::msg_t *> (src_)); |
641 | } |
642 | |
643 | void *zmq_msg_data (zmq_msg_t *msg_) |
644 | { |
645 | return (reinterpret_cast<zmq::msg_t *> (msg_))->data (); |
646 | } |
647 | |
648 | size_t zmq_msg_size (const zmq_msg_t *msg_) |
649 | { |
650 | return ((zmq::msg_t *) msg_)->size (); |
651 | } |
652 | |
653 | int zmq_msg_more (const zmq_msg_t *msg_) |
654 | { |
655 | return zmq_msg_get (msg_, ZMQ_MORE); |
656 | } |
657 | |
658 | int zmq_msg_get (const zmq_msg_t *msg_, int property_) |
659 | { |
660 | const char *fd_string; |
661 | |
662 | switch (property_) { |
663 | case ZMQ_MORE: |
664 | return (((zmq::msg_t *) msg_)->flags () & zmq::msg_t::more) ? 1 : 0; |
665 | case ZMQ_SRCFD: |
666 | fd_string = zmq_msg_gets (msg_, "__fd" ); |
667 | if (fd_string == NULL) |
668 | return -1; |
669 | |
670 | return atoi (fd_string); |
671 | case ZMQ_SHARED: |
672 | return (((zmq::msg_t *) msg_)->is_cmsg ()) |
673 | || (((zmq::msg_t *) msg_)->flags () & zmq::msg_t::shared) |
674 | ? 1 |
675 | : 0; |
676 | default: |
677 | errno = EINVAL; |
678 | return -1; |
679 | } |
680 | } |
681 | |
682 | int zmq_msg_set (zmq_msg_t *, int, int) |
683 | { |
684 | // No properties supported at present |
685 | errno = EINVAL; |
686 | return -1; |
687 | } |
688 | |
689 | int zmq_msg_set_routing_id (zmq_msg_t *msg_, uint32_t routing_id_) |
690 | { |
691 | return (reinterpret_cast<zmq::msg_t *> (msg_)) |
692 | ->set_routing_id (routing_id_); |
693 | } |
694 | |
695 | uint32_t zmq_msg_routing_id (zmq_msg_t *msg_) |
696 | { |
697 | return (reinterpret_cast<zmq::msg_t *> (msg_))->get_routing_id (); |
698 | } |
699 | |
700 | int zmq_msg_set_group (zmq_msg_t *msg_, const char *group_) |
701 | { |
702 | return (reinterpret_cast<zmq::msg_t *> (msg_))->set_group (group_); |
703 | } |
704 | |
705 | const char *zmq_msg_group (zmq_msg_t *msg_) |
706 | { |
707 | return (reinterpret_cast<zmq::msg_t *> (msg_))->group (); |
708 | } |
709 | |
710 | // Get message metadata string |
711 | |
712 | const char *zmq_msg_gets (const zmq_msg_t *msg_, const char *property_) |
713 | { |
714 | const zmq::metadata_t *metadata = |
715 | reinterpret_cast<const zmq::msg_t *> (msg_)->metadata (); |
716 | const char *value = NULL; |
717 | if (metadata) |
718 | value = metadata->get (std::string (property_)); |
719 | if (value) |
720 | return value; |
721 | |
722 | errno = EINVAL; |
723 | return NULL; |
724 | } |
725 | |
726 | // Polling. |
727 | |
728 | #if defined ZMQ_HAVE_POLLER |
729 | inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) |
730 | { |
731 | // implement zmq_poll on top of zmq_poller |
732 | int rc; |
733 | zmq_poller_event_t *events; |
734 | zmq::socket_poller_t poller; |
735 | events = new (std::nothrow) zmq_poller_event_t[nitems_]; |
736 | alloc_assert (events); |
737 | |
738 | bool repeat_items = false; |
739 | // Register sockets with poller |
740 | for (int i = 0; i < nitems_; i++) { |
741 | items_[i].revents = 0; |
742 | |
743 | bool modify = false; |
744 | short e = items_[i].events; |
745 | if (items_[i].socket) { |
746 | // Poll item is a 0MQ socket. |
747 | for (int j = 0; j < i; ++j) { |
748 | // Check for repeat entries |
749 | if (items_[j].socket == items_[i].socket) { |
750 | repeat_items = true; |
751 | modify = true; |
752 | e |= items_[j].events; |
753 | } |
754 | } |
755 | if (modify) { |
756 | rc = zmq_poller_modify (&poller, items_[i].socket, e); |
757 | } else { |
758 | rc = zmq_poller_add (&poller, items_[i].socket, NULL, e); |
759 | } |
760 | if (rc < 0) { |
761 | delete[] events; |
762 | return rc; |
763 | } |
764 | } else { |
765 | // Poll item is a raw file descriptor. |
766 | for (int j = 0; j < i; ++j) { |
767 | // Check for repeat entries |
768 | if (!items_[j].socket && items_[j].fd == items_[i].fd) { |
769 | repeat_items = true; |
770 | modify = true; |
771 | e |= items_[j].events; |
772 | } |
773 | } |
774 | if (modify) { |
775 | rc = zmq_poller_modify_fd (&poller, items_[i].fd, e); |
776 | } else { |
777 | rc = zmq_poller_add_fd (&poller, items_[i].fd, NULL, e); |
778 | } |
779 | if (rc < 0) { |
780 | delete[] events; |
781 | return rc; |
782 | } |
783 | } |
784 | } |
785 | |
786 | // Wait for events |
787 | rc = zmq_poller_wait_all (&poller, events, nitems_, timeout_); |
788 | if (rc < 0) { |
789 | delete[] events; |
790 | if (zmq_errno () == EAGAIN) { |
791 | return 0; |
792 | } |
793 | return rc; |
794 | } |
795 | |
796 | // Transform poller events into zmq_pollitem events. |
797 | // items_ contains all items, while events only contains fired events. |
798 | // If no sockets are repeated (likely), the two are still co-ordered, so step through the items |
799 | // checking for matches only on the first event. |
800 | // If there are repeat items, they cannot be assumed to be co-ordered, |
801 | // so each pollitem must check fired events from the beginning. |
802 | int j_start = 0, found_events = rc; |
803 | for (int i = 0; i < nitems_; i++) { |
804 | for (int j = j_start; j < found_events; ++j) { |
805 | if ((items_[i].socket && items_[i].socket == events[j].socket) |
806 | || (!(items_[i].socket || events[j].socket) |
807 | && items_[i].fd == events[j].fd)) { |
808 | items_[i].revents = events[j].events & items_[i].events; |
809 | if (!repeat_items) { |
810 | // no repeats, we can ignore events we've already seen |
811 | j_start++; |
812 | } |
813 | break; |
814 | } |
815 | if (!repeat_items) { |
816 | // no repeats, never have to look at j > j_start |
817 | break; |
818 | } |
819 | } |
820 | } |
821 | |
822 | // Cleanup |
823 | delete[] events; |
824 | return rc; |
825 | } |
826 | #endif // ZMQ_HAVE_POLLER |
827 | |
828 | int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) |
829 | { |
830 | #if defined ZMQ_HAVE_POLLER |
831 | // if poller is present, use that if there is at least 1 thread-safe socket, |
832 | // otherwise fall back to the previous implementation as it's faster. |
833 | for (int i = 0; i != nitems_; i++) { |
834 | if (items_[i].socket |
835 | && as_socket_base_t (items_[i].socket)->is_thread_safe ()) { |
836 | return zmq_poller_poll (items_, nitems_, timeout_); |
837 | } |
838 | } |
839 | #endif // ZMQ_HAVE_POLLER |
840 | #if defined ZMQ_POLL_BASED_ON_POLL || defined ZMQ_POLL_BASED_ON_SELECT |
841 | if (unlikely (nitems_ < 0)) { |
842 | errno = EINVAL; |
843 | return -1; |
844 | } |
845 | if (unlikely (nitems_ == 0)) { |
846 | if (timeout_ == 0) |
847 | return 0; |
848 | #if defined ZMQ_HAVE_WINDOWS |
849 | Sleep (timeout_ > 0 ? timeout_ : INFINITE); |
850 | return 0; |
851 | #elif defined ZMQ_HAVE_VXWORKS |
852 | struct timespec ns_; |
853 | ns_.tv_sec = timeout_ / 1000; |
854 | ns_.tv_nsec = timeout_ % 1000 * 1000000; |
855 | return nanosleep (&ns_, 0); |
856 | #else |
857 | return usleep (timeout_ * 1000); |
858 | #endif |
859 | } |
860 | if (!items_) { |
861 | errno = EFAULT; |
862 | return -1; |
863 | } |
864 | |
865 | zmq::clock_t clock; |
866 | uint64_t now = 0; |
867 | uint64_t end = 0; |
868 | #if defined ZMQ_POLL_BASED_ON_POLL |
869 | zmq::fast_vector_t<pollfd, ZMQ_POLLITEMS_DFLT> pollfds (nitems_); |
870 | |
871 | // Build pollset for poll () system call. |
872 | for (int i = 0; i != nitems_; i++) { |
873 | // If the poll item is a 0MQ socket, we poll on the file descriptor |
874 | // retrieved by the ZMQ_FD socket option. |
875 | if (items_[i].socket) { |
876 | size_t zmq_fd_size = sizeof (zmq::fd_t); |
877 | if (zmq_getsockopt (items_[i].socket, ZMQ_FD, &pollfds[i].fd, |
878 | &zmq_fd_size) |
879 | == -1) { |
880 | return -1; |
881 | } |
882 | pollfds[i].events = items_[i].events ? POLLIN : 0; |
883 | } |
884 | // Else, the poll item is a raw file descriptor. Just convert the |
885 | // events to normal POLLIN/POLLOUT for poll (). |
886 | else { |
887 | pollfds[i].fd = items_[i].fd; |
888 | pollfds[i].events = |
889 | (items_[i].events & ZMQ_POLLIN ? POLLIN : 0) |
890 | | (items_[i].events & ZMQ_POLLOUT ? POLLOUT : 0) |
891 | | (items_[i].events & ZMQ_POLLPRI ? POLLPRI : 0); |
892 | } |
893 | } |
894 | #else |
895 | // Ensure we do not attempt to select () on more than FD_SETSIZE |
896 | // file descriptors. |
897 | // TODO since this function is called by a client, we could return errno EINVAL/ENOMEM/... here |
898 | zmq_assert (nitems_ <= FD_SETSIZE); |
899 | |
900 | zmq::optimized_fd_set_t pollset_in (nitems_); |
901 | FD_ZERO (pollset_in.get ()); |
902 | zmq::optimized_fd_set_t pollset_out (nitems_); |
903 | FD_ZERO (pollset_out.get ()); |
904 | zmq::optimized_fd_set_t pollset_err (nitems_); |
905 | FD_ZERO (pollset_err.get ()); |
906 | |
907 | zmq::fd_t maxfd = 0; |
908 | |
909 | // Build the fd_sets for passing to select (). |
910 | for (int i = 0; i != nitems_; i++) { |
911 | // If the poll item is a 0MQ socket we are interested in input on the |
912 | // notification file descriptor retrieved by the ZMQ_FD socket option. |
913 | if (items_[i].socket) { |
914 | size_t zmq_fd_size = sizeof (zmq::fd_t); |
915 | zmq::fd_t notify_fd; |
916 | if (zmq_getsockopt (items_[i].socket, ZMQ_FD, ¬ify_fd, |
917 | &zmq_fd_size) |
918 | == -1) |
919 | return -1; |
920 | if (items_[i].events) { |
921 | FD_SET (notify_fd, pollset_in.get ()); |
922 | if (maxfd < notify_fd) |
923 | maxfd = notify_fd; |
924 | } |
925 | } |
926 | // Else, the poll item is a raw file descriptor. Convert the poll item |
927 | // events to the appropriate fd_sets. |
928 | else { |
929 | if (items_[i].events & ZMQ_POLLIN) |
930 | FD_SET (items_[i].fd, pollset_in.get ()); |
931 | if (items_[i].events & ZMQ_POLLOUT) |
932 | FD_SET (items_[i].fd, pollset_out.get ()); |
933 | if (items_[i].events & ZMQ_POLLERR) |
934 | FD_SET (items_[i].fd, pollset_err.get ()); |
935 | if (maxfd < items_[i].fd) |
936 | maxfd = items_[i].fd; |
937 | } |
938 | } |
939 | |
940 | zmq::optimized_fd_set_t inset (nitems_); |
941 | zmq::optimized_fd_set_t outset (nitems_); |
942 | zmq::optimized_fd_set_t errset (nitems_); |
943 | #endif |
944 | |
945 | bool first_pass = true; |
946 | int nevents = 0; |
947 | |
948 | while (true) { |
949 | #if defined ZMQ_POLL_BASED_ON_POLL |
950 | |
951 | // Compute the timeout for the subsequent poll. |
952 | zmq::timeout_t timeout = |
953 | zmq::compute_timeout (first_pass, timeout_, now, end); |
954 | |
955 | // Wait for events. |
956 | { |
957 | int rc = poll (&pollfds[0], nitems_, timeout); |
958 | if (rc == -1 && errno == EINTR) { |
959 | return -1; |
960 | } |
961 | errno_assert (rc >= 0); |
962 | } |
963 | // Check for the events. |
964 | for (int i = 0; i != nitems_; i++) { |
965 | items_[i].revents = 0; |
966 | |
967 | // The poll item is a 0MQ socket. Retrieve pending events |
968 | // using the ZMQ_EVENTS socket option. |
969 | if (items_[i].socket) { |
970 | size_t zmq_events_size = sizeof (uint32_t); |
971 | uint32_t zmq_events; |
972 | if (zmq_getsockopt (items_[i].socket, ZMQ_EVENTS, &zmq_events, |
973 | &zmq_events_size) |
974 | == -1) { |
975 | return -1; |
976 | } |
977 | if ((items_[i].events & ZMQ_POLLOUT) |
978 | && (zmq_events & ZMQ_POLLOUT)) |
979 | items_[i].revents |= ZMQ_POLLOUT; |
980 | if ((items_[i].events & ZMQ_POLLIN) |
981 | && (zmq_events & ZMQ_POLLIN)) |
982 | items_[i].revents |= ZMQ_POLLIN; |
983 | } |
984 | // Else, the poll item is a raw file descriptor, simply convert |
985 | // the events to zmq_pollitem_t-style format. |
986 | else { |
987 | if (pollfds[i].revents & POLLIN) |
988 | items_[i].revents |= ZMQ_POLLIN; |
989 | if (pollfds[i].revents & POLLOUT) |
990 | items_[i].revents |= ZMQ_POLLOUT; |
991 | if (pollfds[i].revents & POLLPRI) |
992 | items_[i].revents |= ZMQ_POLLPRI; |
993 | if (pollfds[i].revents & ~(POLLIN | POLLOUT | POLLPRI)) |
994 | items_[i].revents |= ZMQ_POLLERR; |
995 | } |
996 | |
997 | if (items_[i].revents) |
998 | nevents++; |
999 | } |
1000 | |
1001 | #else |
1002 | |
1003 | // Compute the timeout for the subsequent poll. |
1004 | timeval timeout; |
1005 | timeval *ptimeout; |
1006 | if (first_pass) { |
1007 | timeout.tv_sec = 0; |
1008 | timeout.tv_usec = 0; |
1009 | ptimeout = &timeout; |
1010 | } else if (timeout_ < 0) |
1011 | ptimeout = NULL; |
1012 | else { |
1013 | timeout.tv_sec = static_cast<long> ((end - now) / 1000); |
1014 | timeout.tv_usec = static_cast<long> ((end - now) % 1000 * 1000); |
1015 | ptimeout = &timeout; |
1016 | } |
1017 | |
1018 | // Wait for events. Ignore interrupts if there's infinite timeout. |
1019 | while (true) { |
1020 | memcpy (inset.get (), pollset_in.get (), |
1021 | zmq::valid_pollset_bytes (*pollset_in.get ())); |
1022 | memcpy (outset.get (), pollset_out.get (), |
1023 | zmq::valid_pollset_bytes (*pollset_out.get ())); |
1024 | memcpy (errset.get (), pollset_err.get (), |
1025 | zmq::valid_pollset_bytes (*pollset_err.get ())); |
1026 | #if defined ZMQ_HAVE_WINDOWS |
1027 | int rc = |
1028 | select (0, inset.get (), outset.get (), errset.get (), ptimeout); |
1029 | if (unlikely (rc == SOCKET_ERROR)) { |
1030 | errno = zmq::wsa_error_to_errno (WSAGetLastError ()); |
1031 | wsa_assert (errno == ENOTSOCK); |
1032 | return -1; |
1033 | } |
1034 | #else |
1035 | int rc = select (maxfd + 1, inset.get (), outset.get (), |
1036 | errset.get (), ptimeout); |
1037 | if (unlikely (rc == -1)) { |
1038 | errno_assert (errno == EINTR || errno == EBADF); |
1039 | return -1; |
1040 | } |
1041 | #endif |
1042 | break; |
1043 | } |
1044 | |
1045 | // Check for the events. |
1046 | for (int i = 0; i != nitems_; i++) { |
1047 | items_[i].revents = 0; |
1048 | |
1049 | // The poll item is a 0MQ socket. Retrieve pending events |
1050 | // using the ZMQ_EVENTS socket option. |
1051 | if (items_[i].socket) { |
1052 | size_t zmq_events_size = sizeof (uint32_t); |
1053 | uint32_t zmq_events; |
1054 | if (zmq_getsockopt (items_[i].socket, ZMQ_EVENTS, &zmq_events, |
1055 | &zmq_events_size) |
1056 | == -1) |
1057 | return -1; |
1058 | if ((items_[i].events & ZMQ_POLLOUT) |
1059 | && (zmq_events & ZMQ_POLLOUT)) |
1060 | items_[i].revents |= ZMQ_POLLOUT; |
1061 | if ((items_[i].events & ZMQ_POLLIN) |
1062 | && (zmq_events & ZMQ_POLLIN)) |
1063 | items_[i].revents |= ZMQ_POLLIN; |
1064 | } |
1065 | // Else, the poll item is a raw file descriptor, simply convert |
1066 | // the events to zmq_pollitem_t-style format. |
1067 | else { |
1068 | if (FD_ISSET (items_[i].fd, inset.get ())) |
1069 | items_[i].revents |= ZMQ_POLLIN; |
1070 | if (FD_ISSET (items_[i].fd, outset.get ())) |
1071 | items_[i].revents |= ZMQ_POLLOUT; |
1072 | if (FD_ISSET (items_[i].fd, errset.get ())) |
1073 | items_[i].revents |= ZMQ_POLLERR; |
1074 | } |
1075 | |
1076 | if (items_[i].revents) |
1077 | nevents++; |
1078 | } |
1079 | #endif |
1080 | |
1081 | // If timeout is zero, exit immediately whether there are events or not. |
1082 | if (timeout_ == 0) |
1083 | break; |
1084 | |
1085 | // If there are events to return, we can exit immediately. |
1086 | if (nevents) |
1087 | break; |
1088 | |
1089 | // At this point we are meant to wait for events but there are none. |
1090 | // If timeout is infinite we can just loop until we get some events. |
1091 | if (timeout_ < 0) { |
1092 | if (first_pass) |
1093 | first_pass = false; |
1094 | continue; |
1095 | } |
1096 | |
1097 | // The timeout is finite and there are no events. In the first pass |
1098 | // we get a timestamp of when the polling have begun. (We assume that |
1099 | // first pass have taken negligible time). We also compute the time |
1100 | // when the polling should time out. |
1101 | if (first_pass) { |
1102 | now = clock.now_ms (); |
1103 | end = now + timeout_; |
1104 | if (now == end) |
1105 | break; |
1106 | first_pass = false; |
1107 | continue; |
1108 | } |
1109 | |
1110 | // Find out whether timeout have expired. |
1111 | now = clock.now_ms (); |
1112 | if (now >= end) |
1113 | break; |
1114 | } |
1115 | |
1116 | return nevents; |
1117 | #else |
1118 | // Exotic platforms that support neither poll() nor select(). |
1119 | errno = ENOTSUP; |
1120 | return -1; |
1121 | #endif |
1122 | } |
1123 | |
1124 | // The poller functionality |
1125 | |
1126 | void *zmq_poller_new (void) |
1127 | { |
1128 | zmq::socket_poller_t *poller = new (std::nothrow) zmq::socket_poller_t; |
1129 | if (!poller) { |
1130 | errno = ENOMEM; |
1131 | } |
1132 | return poller; |
1133 | } |
1134 | |
1135 | int zmq_poller_destroy (void **poller_p_) |
1136 | { |
1137 | if (poller_p_) { |
1138 | zmq::socket_poller_t *const poller = |
1139 | static_cast<zmq::socket_poller_t *> (*poller_p_); |
1140 | if (poller && poller->check_tag ()) { |
1141 | delete poller; |
1142 | *poller_p_ = NULL; |
1143 | return 0; |
1144 | } |
1145 | } |
1146 | errno = EFAULT; |
1147 | return -1; |
1148 | } |
1149 | |
1150 | |
1151 | static int check_poller (void *const poller_) |
1152 | { |
1153 | if (!poller_ |
1154 | || !(static_cast<zmq::socket_poller_t *> (poller_))->check_tag ()) { |
1155 | errno = EFAULT; |
1156 | return -1; |
1157 | } |
1158 | |
1159 | return 0; |
1160 | } |
1161 | |
1162 | static int check_events (const short events_) |
1163 | { |
1164 | if (events_ & ~(ZMQ_POLLIN | ZMQ_POLLOUT | ZMQ_POLLERR | ZMQ_POLLPRI)) { |
1165 | errno = EINVAL; |
1166 | return -1; |
1167 | } |
1168 | return 0; |
1169 | } |
1170 | |
1171 | static int check_poller_registration_args (void *const poller_, void *const s_) |
1172 | { |
1173 | if (-1 == check_poller (poller_)) |
1174 | return -1; |
1175 | |
1176 | if (!s_ || !(static_cast<zmq::socket_base_t *> (s_))->check_tag ()) { |
1177 | errno = ENOTSOCK; |
1178 | return -1; |
1179 | } |
1180 | |
1181 | return 0; |
1182 | } |
1183 | |
1184 | static int check_poller_fd_registration_args (void *const poller_, |
1185 | const zmq::fd_t fd_) |
1186 | { |
1187 | if (-1 == check_poller (poller_)) |
1188 | return -1; |
1189 | |
1190 | if (fd_ == zmq::retired_fd) { |
1191 | errno = EBADF; |
1192 | return -1; |
1193 | } |
1194 | |
1195 | return 0; |
1196 | } |
1197 | |
1198 | int zmq_poller_add (void *poller_, void *s_, void *user_data_, short events_) |
1199 | { |
1200 | if (-1 == check_poller_registration_args (poller_, s_) |
1201 | || -1 == check_events (events_)) |
1202 | return -1; |
1203 | |
1204 | zmq::socket_base_t *socket = static_cast<zmq::socket_base_t *> (s_); |
1205 | |
1206 | return (static_cast<zmq::socket_poller_t *> (poller_)) |
1207 | ->add (socket, user_data_, events_); |
1208 | } |
1209 | |
1210 | int zmq_poller_add_fd (void *poller_, |
1211 | zmq::fd_t fd_, |
1212 | void *user_data_, |
1213 | short events_) |
1214 | { |
1215 | if (-1 == check_poller_fd_registration_args (poller_, fd_) |
1216 | || -1 == check_events (events_)) |
1217 | return -1; |
1218 | |
1219 | return (static_cast<zmq::socket_poller_t *> (poller_)) |
1220 | ->add_fd (fd_, user_data_, events_); |
1221 | } |
1222 | |
1223 | |
1224 | int zmq_poller_modify (void *poller_, void *s_, short events_) |
1225 | { |
1226 | if (-1 == check_poller_registration_args (poller_, s_) |
1227 | || -1 == check_events (events_)) |
1228 | return -1; |
1229 | |
1230 | zmq::socket_base_t *socket = static_cast<zmq::socket_base_t *> (s_); |
1231 | |
1232 | return (static_cast<zmq::socket_poller_t *> (poller_)) |
1233 | ->modify (socket, events_); |
1234 | } |
1235 | |
1236 | int zmq_poller_modify_fd (void *poller_, zmq::fd_t fd_, short events_) |
1237 | { |
1238 | if (-1 == check_poller_fd_registration_args (poller_, fd_) |
1239 | || -1 == check_events (events_)) |
1240 | return -1; |
1241 | |
1242 | return (static_cast<zmq::socket_poller_t *> (poller_)) |
1243 | ->modify_fd (fd_, events_); |
1244 | } |
1245 | |
1246 | int zmq_poller_remove (void *poller_, void *s_) |
1247 | { |
1248 | if (-1 == check_poller_registration_args (poller_, s_)) |
1249 | return -1; |
1250 | |
1251 | zmq::socket_base_t *socket = static_cast<zmq::socket_base_t *> (s_); |
1252 | |
1253 | return (static_cast<zmq::socket_poller_t *> (poller_))->remove (socket); |
1254 | } |
1255 | |
1256 | int zmq_poller_remove_fd (void *poller_, zmq::fd_t fd_) |
1257 | { |
1258 | if (-1 == check_poller_fd_registration_args (poller_, fd_)) |
1259 | return -1; |
1260 | |
1261 | return (static_cast<zmq::socket_poller_t *> (poller_))->remove_fd (fd_); |
1262 | } |
1263 | |
1264 | int zmq_poller_wait (void *poller_, zmq_poller_event_t *event_, long timeout_) |
1265 | { |
1266 | int rc = zmq_poller_wait_all (poller_, event_, 1, timeout_); |
1267 | |
1268 | if (rc < 0 && event_) { |
1269 | // TODO this is not portable... zmq_poller_event_t contains pointers, |
1270 | // for which nullptr does not need to be represented by all-zeroes |
1271 | memset (event_, 0, sizeof (zmq_poller_event_t)); |
1272 | } |
1273 | // wait_all returns number of events, but we return 0 for any success |
1274 | return rc >= 0 ? 0 : rc; |
1275 | } |
1276 | |
1277 | int zmq_poller_wait_all (void *poller_, |
1278 | zmq_poller_event_t *events_, |
1279 | int n_events_, |
1280 | long timeout_) |
1281 | { |
1282 | if (-1 == check_poller (poller_)) |
1283 | return -1; |
1284 | |
1285 | if (!events_) { |
1286 | errno = EFAULT; |
1287 | return -1; |
1288 | } |
1289 | if (n_events_ < 0) { |
1290 | errno = EINVAL; |
1291 | return -1; |
1292 | } |
1293 | |
1294 | int rc = |
1295 | (static_cast<zmq::socket_poller_t *> (poller_)) |
1296 | ->wait (reinterpret_cast<zmq::socket_poller_t::event_t *> (events_), |
1297 | n_events_, timeout_); |
1298 | |
1299 | return rc; |
1300 | } |
1301 | |
1302 | int zmq_poller_fd (void *poller_, zmq_fd_t *fd_) |
1303 | { |
1304 | if (!poller_ |
1305 | || !(static_cast<zmq::socket_poller_t *> (poller_)->check_tag ())) { |
1306 | errno = EFAULT; |
1307 | return -1; |
1308 | } |
1309 | return static_cast<zmq::socket_poller_t *> (poller_)->signaler_fd (fd_); |
1310 | } |
1311 | |
1312 | // Peer-specific state |
1313 | |
1314 | int zmq_socket_get_peer_state (void *s_, |
1315 | const void *routing_id_, |
1316 | size_t routing_id_size_) |
1317 | { |
1318 | const zmq::socket_base_t *const s = as_socket_base_t (s_); |
1319 | if (!s) |
1320 | return -1; |
1321 | |
1322 | return s->get_peer_state (routing_id_, routing_id_size_); |
1323 | } |
1324 | |
1325 | // Timers |
1326 | |
1327 | void *zmq_timers_new (void) |
1328 | { |
1329 | zmq::timers_t *timers = new (std::nothrow) zmq::timers_t; |
1330 | alloc_assert (timers); |
1331 | return timers; |
1332 | } |
1333 | |
1334 | int zmq_timers_destroy (void **timers_p_) |
1335 | { |
1336 | void *timers = *timers_p_; |
1337 | if (!timers || !(static_cast<zmq::timers_t *> (timers))->check_tag ()) { |
1338 | errno = EFAULT; |
1339 | return -1; |
1340 | } |
1341 | delete (static_cast<zmq::timers_t *> (timers)); |
1342 | *timers_p_ = NULL; |
1343 | return 0; |
1344 | } |
1345 | |
1346 | int zmq_timers_add (void *timers_, |
1347 | size_t interval_, |
1348 | zmq_timer_fn handler_, |
1349 | void *arg_) |
1350 | { |
1351 | if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) { |
1352 | errno = EFAULT; |
1353 | return -1; |
1354 | } |
1355 | |
1356 | return (static_cast<zmq::timers_t *> (timers_)) |
1357 | ->add (interval_, handler_, arg_); |
1358 | } |
1359 | |
1360 | int zmq_timers_cancel (void *timers_, int timer_id_) |
1361 | { |
1362 | if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) { |
1363 | errno = EFAULT; |
1364 | return -1; |
1365 | } |
1366 | |
1367 | return (static_cast<zmq::timers_t *> (timers_))->cancel (timer_id_); |
1368 | } |
1369 | |
1370 | int zmq_timers_set_interval (void *timers_, int timer_id_, size_t interval_) |
1371 | { |
1372 | if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) { |
1373 | errno = EFAULT; |
1374 | return -1; |
1375 | } |
1376 | |
1377 | return (static_cast<zmq::timers_t *> (timers_)) |
1378 | ->set_interval (timer_id_, interval_); |
1379 | } |
1380 | |
1381 | int zmq_timers_reset (void *timers_, int timer_id_) |
1382 | { |
1383 | if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) { |
1384 | errno = EFAULT; |
1385 | return -1; |
1386 | } |
1387 | |
1388 | return (static_cast<zmq::timers_t *> (timers_))->reset (timer_id_); |
1389 | } |
1390 | |
1391 | long zmq_timers_timeout (void *timers_) |
1392 | { |
1393 | if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) { |
1394 | errno = EFAULT; |
1395 | return -1; |
1396 | } |
1397 | |
1398 | return (static_cast<zmq::timers_t *> (timers_))->timeout (); |
1399 | } |
1400 | |
1401 | int zmq_timers_execute (void *timers_) |
1402 | { |
1403 | if (!timers_ || !(static_cast<zmq::timers_t *> (timers_))->check_tag ()) { |
1404 | errno = EFAULT; |
1405 | return -1; |
1406 | } |
1407 | |
1408 | return (static_cast<zmq::timers_t *> (timers_))->execute (); |
1409 | } |
1410 | |
1411 | // The proxy functionality |
1412 | |
1413 | int zmq_proxy (void *frontend_, void *backend_, void *capture_) |
1414 | { |
1415 | if (!frontend_ || !backend_) { |
1416 | errno = EFAULT; |
1417 | return -1; |
1418 | } |
1419 | return zmq::proxy (static_cast<zmq::socket_base_t *> (frontend_), |
1420 | static_cast<zmq::socket_base_t *> (backend_), |
1421 | static_cast<zmq::socket_base_t *> (capture_)); |
1422 | } |
1423 | |
1424 | int zmq_proxy_steerable (void *frontend_, |
1425 | void *backend_, |
1426 | void *capture_, |
1427 | void *control_) |
1428 | { |
1429 | if (!frontend_ || !backend_) { |
1430 | errno = EFAULT; |
1431 | return -1; |
1432 | } |
1433 | return zmq::proxy (static_cast<zmq::socket_base_t *> (frontend_), |
1434 | static_cast<zmq::socket_base_t *> (backend_), |
1435 | static_cast<zmq::socket_base_t *> (capture_), |
1436 | static_cast<zmq::socket_base_t *> (control_)); |
1437 | } |
1438 | |
1439 | // The deprecated device functionality |
1440 | |
1441 | int zmq_device (int /* type */, void *frontend_, void *backend_) |
1442 | { |
1443 | return zmq::proxy (static_cast<zmq::socket_base_t *> (frontend_), |
1444 | static_cast<zmq::socket_base_t *> (backend_), NULL); |
1445 | } |
1446 | |
1447 | // Probe library capabilities; for now, reports on transport and security |
1448 | |
1449 | int zmq_has (const char *capability_) |
1450 | { |
1451 | #if defined(ZMQ_HAVE_IPC) |
1452 | if (strcmp (capability_, zmq::protocol_name::ipc) == 0) |
1453 | return true; |
1454 | #endif |
1455 | #if defined(ZMQ_HAVE_OPENPGM) |
1456 | if (strcmp (capability_, "pgm" ) == 0) |
1457 | return true; |
1458 | #endif |
1459 | #if defined(ZMQ_HAVE_TIPC) |
1460 | if (strcmp (capability_, zmq::protocol_name::tipc) == 0) |
1461 | return true; |
1462 | #endif |
1463 | #if defined(ZMQ_HAVE_NORM) |
1464 | if (strcmp (capability_, "norm" ) == 0) |
1465 | return true; |
1466 | #endif |
1467 | #if defined(ZMQ_HAVE_CURVE) |
1468 | if (strcmp (capability_, "curve" ) == 0) |
1469 | return true; |
1470 | #endif |
1471 | #if defined(HAVE_LIBGSSAPI_KRB5) |
1472 | if (strcmp (capability_, "gssapi" ) == 0) |
1473 | return true; |
1474 | #endif |
1475 | #if defined(ZMQ_HAVE_VMCI) |
1476 | if (strcmp (capability_, zmq::protocol_name::vmci) == 0) |
1477 | return true; |
1478 | #endif |
1479 | #if defined(ZMQ_BUILD_DRAFT_API) |
1480 | if (strcmp (capability_, "draft" ) == 0) |
1481 | return true; |
1482 | #endif |
1483 | #if defined(ZMQ_HAVE_WS) |
1484 | if (strcmp (capability_, "WS" ) == 0) |
1485 | return true; |
1486 | #endif |
1487 | #if defined(ZMQ_HAVE_WSS) |
1488 | if (strcmp (capability_, "WSS" ) == 0) |
1489 | return true; |
1490 | #endif |
1491 | // Whatever the application asked for, we don't have |
1492 | return false; |
1493 | } |
1494 | |
1495 | int zmq_socket_monitor_pipes_stats (void *s_) |
1496 | { |
1497 | zmq::socket_base_t *s = as_socket_base_t (s_); |
1498 | if (!s) |
1499 | return -1; |
1500 | return s->query_pipes_stats (); |
1501 | } |
1502 | |