1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31
32#ifdef ZMQ_HAVE_OPENPGM
33
34#ifdef ZMQ_HAVE_LINUX
35#include <poll.h>
36#endif
37
38#include <stdlib.h>
39#include <string.h>
40#include <string>
41
42#include "options.hpp"
43#include "pgm_socket.hpp"
44#include "config.hpp"
45#include "err.hpp"
46#include "random.hpp"
47#include "stdint.hpp"
48
49#ifndef MSG_ERRQUEUE
50#define MSG_ERRQUEUE 0x2000
51#endif
52
53zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) :
54 sock (NULL),
55 options (options_),
56 receiver (receiver_),
57 pgm_msgv (NULL),
58 pgm_msgv_len (0),
59 nbytes_rec (0),
60 nbytes_processed (0),
61 pgm_msgv_processed (0)
62{
63}
64
65// Resolve PGM socket address.
66// network_ of the form <interface & multicast group decls>:<IP port>
67// e.g. eth0;239.192.0.1:7500
68// link-local;224.250.0.1,224.250.0.2;224.250.0.3:8000
69// ;[fe80::1%en0]:7500
70int zmq::pgm_socket_t::init_address (const char *network_,
71 struct pgm_addrinfo_t **res,
72 uint16_t *port_number)
73{
74 // Parse port number, start from end for IPv6
75 const char *port_delim = strrchr (network_, ':');
76 if (!port_delim) {
77 errno = EINVAL;
78 return -1;
79 }
80
81 *port_number = atoi (port_delim + 1);
82
83 char network[256];
84 if (port_delim - network_ >= (int) sizeof (network) - 1) {
85 errno = EINVAL;
86 return -1;
87 }
88 memset (network, '\0', sizeof (network));
89 memcpy (network, network_, port_delim - network_);
90
91 pgm_error_t *pgm_error = NULL;
92 struct pgm_addrinfo_t hints;
93
94 memset (&hints, 0, sizeof (hints));
95 hints.ai_family = AF_UNSPEC;
96 if (!pgm_getaddrinfo (network, NULL, res, &pgm_error)) {
97 // Invalid parameters don't set pgm_error_t.
98 zmq_assert (pgm_error != NULL);
99 if (pgm_error->domain == PGM_ERROR_DOMAIN_IF &&
100
101 // NB: cannot catch EAI_BADFLAGS.
102 (pgm_error->code != PGM_ERROR_SERVICE
103 && pgm_error->code != PGM_ERROR_SOCKTNOSUPPORT)) {
104 // User, host, or network configuration or transient error.
105 pgm_error_free (pgm_error);
106 errno = EINVAL;
107 return -1;
108 }
109
110 // Fatal OpenPGM internal error.
111 zmq_assert (false);
112 }
113 return 0;
114}
115
116// Create, bind and connect PGM socket.
117int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
118{
119 // Can not open transport before destroying old one.
120 zmq_assert (sock == NULL);
121 zmq_assert (options.rate > 0);
122
123 // Zero counter used in msgrecv.
124 nbytes_rec = 0;
125 nbytes_processed = 0;
126 pgm_msgv_processed = 0;
127
128 uint16_t port_number;
129 struct pgm_addrinfo_t *res = NULL;
130 sa_family_t sa_family;
131
132 pgm_error_t *pgm_error = NULL;
133
134 if (init_address (network_, &res, &port_number) < 0) {
135 goto err_abort;
136 }
137
138 zmq_assert (res != NULL);
139
140 // Pick up detected IP family.
141 sa_family = res->ai_send_addrs[0].gsr_group.ss_family;
142
143 // Create IP/PGM or UDP/PGM socket.
144 if (udp_encapsulation_) {
145 if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP,
146 &pgm_error)) {
147 // Invalid parameters don't set pgm_error_t.
148 zmq_assert (pgm_error != NULL);
149 if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET
150 && (pgm_error->code != PGM_ERROR_BADF
151 && pgm_error->code != PGM_ERROR_FAULT
152 && pgm_error->code != PGM_ERROR_NOPROTOOPT
153 && pgm_error->code != PGM_ERROR_FAILED))
154
155 // User, host, or network configuration or transient error.
156 goto err_abort;
157
158 // Fatal OpenPGM internal error.
159 zmq_assert (false);
160 }
161
162 // All options are of data type int
163 const int encapsulation_port = port_number;
164 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT,
165 &encapsulation_port, sizeof (encapsulation_port)))
166 goto err_abort;
167 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT,
168 &encapsulation_port, sizeof (encapsulation_port)))
169 goto err_abort;
170 } else {
171 if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM,
172 &pgm_error)) {
173 // Invalid parameters don't set pgm_error_t.
174 zmq_assert (pgm_error != NULL);
175 if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET
176 && (pgm_error->code != PGM_ERROR_BADF
177 && pgm_error->code != PGM_ERROR_FAULT
178 && pgm_error->code != PGM_ERROR_NOPROTOOPT
179 && pgm_error->code != PGM_ERROR_FAILED))
180
181 // User, host, or network configuration or transient error.
182 goto err_abort;
183
184 // Fatal OpenPGM internal error.
185 zmq_assert (false);
186 }
187 }
188
189 {
190 const int rcvbuf = (int) options.rcvbuf;
191 if (rcvbuf >= 0) {
192 if (!pgm_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &rcvbuf,
193 sizeof (rcvbuf)))
194 goto err_abort;
195 }
196
197 const int sndbuf = (int) options.sndbuf;
198 if (sndbuf >= 0) {
199 if (!pgm_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &sndbuf,
200 sizeof (sndbuf)))
201 goto err_abort;
202 }
203
204 const int max_tpdu = (int) options.multicast_maxtpdu;
205 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MTU, &max_tpdu,
206 sizeof (max_tpdu)))
207 goto err_abort;
208 }
209
210 if (receiver) {
211 const int recv_only = 1, rxw_max_tpdu = (int) options.multicast_maxtpdu,
212 rxw_sqns = compute_sqns (rxw_max_tpdu),
213 peer_expiry = pgm_secs (300), spmr_expiry = pgm_msecs (25),
214 nak_bo_ivl = pgm_msecs (50), nak_rpt_ivl = pgm_msecs (200),
215 nak_rdata_ivl = pgm_msecs (200), nak_data_retries = 50,
216 nak_ncf_retries = 50;
217
218 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only,
219 sizeof (recv_only))
220 || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SQNS, &rxw_sqns,
221 sizeof (rxw_sqns))
222 || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY,
223 &peer_expiry, sizeof (peer_expiry))
224 || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY,
225 &spmr_expiry, sizeof (spmr_expiry))
226 || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl,
227 sizeof (nak_bo_ivl))
228 || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RPT_IVL,
229 &nak_rpt_ivl, sizeof (nak_rpt_ivl))
230 || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RDATA_IVL,
231 &nak_rdata_ivl, sizeof (nak_rdata_ivl))
232 || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_DATA_RETRIES,
233 &nak_data_retries, sizeof (nak_data_retries))
234 || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_NCF_RETRIES,
235 &nak_ncf_retries, sizeof (nak_ncf_retries)))
236 goto err_abort;
237 } else {
238 const int send_only = 1, max_rte = (int) ((options.rate * 1000) / 8),
239 txw_max_tpdu = (int) options.multicast_maxtpdu,
240 txw_sqns = compute_sqns (txw_max_tpdu),
241 ambient_spm = pgm_secs (30),
242 heartbeat_spm[] = {
243 pgm_msecs (100), pgm_msecs (100), pgm_msecs (100),
244 pgm_msecs (100), pgm_msecs (1300), pgm_secs (7),
245 pgm_secs (16), pgm_secs (25), pgm_secs (30)};
246
247 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_ONLY, &send_only,
248 sizeof (send_only))
249 || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_ODATA_MAX_RTE, &max_rte,
250 sizeof (max_rte))
251 || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SQNS, &txw_sqns,
252 sizeof (txw_sqns))
253 || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM,
254 &ambient_spm, sizeof (ambient_spm))
255 || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM,
256 &heartbeat_spm, sizeof (heartbeat_spm)))
257 goto err_abort;
258 }
259
260 // PGM transport GSI.
261 struct pgm_sockaddr_t addr;
262
263 memset (&addr, 0, sizeof (addr));
264 addr.sa_port = port_number;
265 addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT;
266
267 // Create random GSI.
268 uint32_t buf[2];
269 buf[0] = generate_random ();
270 buf[1] = generate_random ();
271 if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, (uint8_t *) buf, 8))
272 goto err_abort;
273
274
275 // Bind a transport to the specified network devices.
276 struct pgm_interface_req_t if_req;
277 memset (&if_req, 0, sizeof (if_req));
278 if_req.ir_interface = res->ai_recv_addrs[0].gsr_interface;
279 if_req.ir_scope_id = 0;
280 if (AF_INET6 == sa_family) {
281 struct sockaddr_in6 sa6;
282 memcpy (&sa6, &res->ai_recv_addrs[0].gsr_group, sizeof (sa6));
283 if_req.ir_scope_id = sa6.sin6_scope_id;
284 }
285 if (!pgm_bind3 (sock, &addr, sizeof (addr), &if_req, sizeof (if_req),
286 &if_req, sizeof (if_req), &pgm_error)) {
287 // Invalid parameters don't set pgm_error_t.
288 zmq_assert (pgm_error != NULL);
289 if ((pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET
290 || pgm_error->domain == PGM_ERROR_DOMAIN_IF)
291 && (pgm_error->code != PGM_ERROR_INVAL
292 && pgm_error->code != PGM_ERROR_BADF
293 && pgm_error->code != PGM_ERROR_FAULT))
294
295 // User, host, or network configuration or transient error.
296 goto err_abort;
297
298 // Fatal OpenPGM internal error.
299 zmq_assert (false);
300 }
301
302 // Join IP multicast groups.
303 for (unsigned i = 0; i < res->ai_recv_addrs_len; i++) {
304 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_JOIN_GROUP,
305 &res->ai_recv_addrs[i], sizeof (struct group_req)))
306 goto err_abort;
307 }
308 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_GROUP,
309 &res->ai_send_addrs[0], sizeof (struct group_req)))
310 goto err_abort;
311
312 pgm_freeaddrinfo (res);
313 res = NULL;
314
315 // Set IP level parameters.
316 {
317 // Multicast loopback disabled by default
318 const int multicast_loop = 0;
319 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_LOOP,
320 &multicast_loop, sizeof (multicast_loop)))
321 goto err_abort;
322
323 const int multicast_hops = options.multicast_hops;
324 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS,
325 &multicast_hops, sizeof (multicast_hops)))
326 goto err_abort;
327
328 // Expedited Forwarding PHB for network elements, no ECN.
329 // Ignore return value due to varied runtime support.
330 const int dscp = 0x2e << 2;
331 if (AF_INET6 != sa_family)
332 pgm_setsockopt (sock, IPPROTO_PGM, PGM_TOS, &dscp, sizeof (dscp));
333
334 const int nonblocking = 1;
335 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK, &nonblocking,
336 sizeof (nonblocking)))
337 goto err_abort;
338 }
339
340 // Connect PGM transport to start state machine.
341 if (!pgm_connect (sock, &pgm_error)) {
342 // Invalid parameters don't set pgm_error_t.
343 zmq_assert (pgm_error != NULL);
344 goto err_abort;
345 }
346
347 // For receiver transport preallocate pgm_msgv array.
348 if (receiver) {
349 zmq_assert (options.in_batch_size > 0);
350 size_t max_tsdu_size = get_max_tsdu_size ();
351 pgm_msgv_len = (int) options.in_batch_size / max_tsdu_size;
352 if ((int) options.in_batch_size % max_tsdu_size)
353 pgm_msgv_len++;
354 zmq_assert (pgm_msgv_len);
355
356 pgm_msgv = (pgm_msgv_t *) malloc (sizeof (pgm_msgv_t) * pgm_msgv_len);
357 alloc_assert (pgm_msgv);
358 }
359
360 return 0;
361
362err_abort:
363 if (sock != NULL) {
364 pgm_close (sock, FALSE);
365 sock = NULL;
366 }
367 if (res != NULL) {
368 pgm_freeaddrinfo (res);
369 res = NULL;
370 }
371 if (pgm_error != NULL) {
372 pgm_error_free (pgm_error);
373 pgm_error = NULL;
374 }
375 errno = EINVAL;
376 return -1;
377}
378
379zmq::pgm_socket_t::~pgm_socket_t ()
380{
381 if (pgm_msgv)
382 free (pgm_msgv);
383 if (sock)
384 pgm_close (sock, TRUE);
385}
386
387// Get receiver fds. receive_fd_ is signaled for incoming packets,
388// waiting_pipe_fd_ is signaled for state driven events and data.
389void zmq::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_,
390 fd_t *waiting_pipe_fd_)
391{
392 socklen_t socklen;
393 bool rc;
394
395 zmq_assert (receive_fd_);
396 zmq_assert (waiting_pipe_fd_);
397
398 socklen = sizeof (*receive_fd_);
399 rc =
400 pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_, &socklen);
401 zmq_assert (rc);
402 zmq_assert (socklen == sizeof (*receive_fd_));
403
404 socklen = sizeof (*waiting_pipe_fd_);
405 rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK, waiting_pipe_fd_,
406 &socklen);
407 zmq_assert (rc);
408 zmq_assert (socklen == sizeof (*waiting_pipe_fd_));
409}
410
411// Get fds and store them into user allocated memory.
412// send_fd is for non-blocking send wire notifications.
413// receive_fd_ is for incoming back-channel protocol packets.
414// rdata_notify_fd_ is raised for waiting repair transmissions.
415// pending_notify_fd_ is for state driven events.
416void zmq::pgm_socket_t::get_sender_fds (fd_t *send_fd_,
417 fd_t *receive_fd_,
418 fd_t *rdata_notify_fd_,
419 fd_t *pending_notify_fd_)
420{
421 socklen_t socklen;
422 bool rc;
423
424 zmq_assert (send_fd_);
425 zmq_assert (receive_fd_);
426 zmq_assert (rdata_notify_fd_);
427 zmq_assert (pending_notify_fd_);
428
429 socklen = sizeof (*send_fd_);
430 rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_SEND_SOCK, send_fd_, &socklen);
431 zmq_assert (rc);
432 zmq_assert (socklen == sizeof (*receive_fd_));
433
434 socklen = sizeof (*receive_fd_);
435 rc =
436 pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_, &socklen);
437 zmq_assert (rc);
438 zmq_assert (socklen == sizeof (*receive_fd_));
439
440 socklen = sizeof (*rdata_notify_fd_);
441 rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_REPAIR_SOCK, rdata_notify_fd_,
442 &socklen);
443 zmq_assert (rc);
444 zmq_assert (socklen == sizeof (*rdata_notify_fd_));
445
446 socklen = sizeof (*pending_notify_fd_);
447 rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK,
448 pending_notify_fd_, &socklen);
449 zmq_assert (rc);
450 zmq_assert (socklen == sizeof (*pending_notify_fd_));
451}
452
453// Send one APDU, transmit window owned memory.
454// data_len_ must be less than one TPDU.
455size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
456{
457 size_t nbytes = 0;
458
459 const int status = pgm_send (sock, data_, data_len_, &nbytes);
460
461 // We have to write all data as one packet.
462 if (nbytes > 0) {
463 zmq_assert (status == PGM_IO_STATUS_NORMAL);
464 zmq_assert (nbytes == data_len_);
465 } else {
466 zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED
467 || status == PGM_IO_STATUS_WOULD_BLOCK);
468
469 if (status == PGM_IO_STATUS_RATE_LIMITED)
470 errno = ENOMEM;
471 else
472 errno = EBUSY;
473 }
474
475 // Save return value.
476 last_tx_status = status;
477
478 return nbytes;
479}
480
481long zmq::pgm_socket_t::get_rx_timeout ()
482{
483 if (last_rx_status != PGM_IO_STATUS_RATE_LIMITED
484 && last_rx_status != PGM_IO_STATUS_TIMER_PENDING)
485 return -1;
486
487 struct timeval tv;
488 socklen_t optlen = sizeof (tv);
489 const bool rc = pgm_getsockopt (sock, IPPROTO_PGM,
490 last_rx_status == PGM_IO_STATUS_RATE_LIMITED
491 ? PGM_RATE_REMAIN
492 : PGM_TIME_REMAIN,
493 &tv, &optlen);
494 zmq_assert (rc);
495
496 const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
497
498 return timeout;
499}
500
501long zmq::pgm_socket_t::get_tx_timeout ()
502{
503 if (last_tx_status != PGM_IO_STATUS_RATE_LIMITED)
504 return -1;
505
506 struct timeval tv;
507 socklen_t optlen = sizeof (tv);
508 const bool rc =
509 pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &optlen);
510 zmq_assert (rc);
511
512 const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
513
514 return timeout;
515}
516
517// Return max TSDU size without fragmentation from current PGM transport.
518size_t zmq::pgm_socket_t::get_max_tsdu_size ()
519{
520 int max_tsdu = 0;
521 socklen_t optlen = sizeof (max_tsdu);
522
523 bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_MSS, &max_tsdu, &optlen);
524 zmq_assert (rc);
525 zmq_assert (optlen == sizeof (max_tsdu));
526 return (size_t) max_tsdu;
527}
528
529// pgm_recvmsgv is called to fill the pgm_msgv array up to pgm_msgv_len.
530// In subsequent calls data from pgm_msgv structure are returned.
531ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
532{
533 size_t raw_data_len = 0;
534
535 // We just sent all data from pgm_transport_recvmsgv up
536 // and have to return 0 that another engine in this thread is scheduled.
537 if (nbytes_rec == nbytes_processed && nbytes_rec > 0) {
538 // Reset all the counters.
539 nbytes_rec = 0;
540 nbytes_processed = 0;
541 pgm_msgv_processed = 0;
542 errno = EAGAIN;
543 return 0;
544 }
545
546 // If we have are going first time or if we have processed all pgm_msgv_t
547 // structure previously read from the pgm socket.
548 if (nbytes_rec == nbytes_processed) {
549 // Check program flow.
550 zmq_assert (pgm_msgv_processed == 0);
551 zmq_assert (nbytes_processed == 0);
552 zmq_assert (nbytes_rec == 0);
553
554 // Receive a vector of Application Protocol Domain Unit's (APDUs)
555 // from the transport.
556 pgm_error_t *pgm_error = NULL;
557
558 const int status = pgm_recvmsgv (sock, pgm_msgv, pgm_msgv_len,
559 MSG_ERRQUEUE, &nbytes_rec, &pgm_error);
560
561 // Invalid parameters.
562 zmq_assert (status != PGM_IO_STATUS_ERROR);
563
564 last_rx_status = status;
565
566 // In a case when no ODATA/RDATA fired POLLIN event (SPM...)
567 // pgm_recvmsg returns PGM_IO_STATUS_TIMER_PENDING.
568 if (status == PGM_IO_STATUS_TIMER_PENDING) {
569 zmq_assert (nbytes_rec == 0);
570
571 // In case if no RDATA/ODATA caused POLLIN 0 is
572 // returned.
573 nbytes_rec = 0;
574 errno = EBUSY;
575 return 0;
576 }
577
578 // Send SPMR, NAK, ACK is rate limited.
579 if (status == PGM_IO_STATUS_RATE_LIMITED) {
580 zmq_assert (nbytes_rec == 0);
581
582 // In case if no RDATA/ODATA caused POLLIN 0 is returned.
583 nbytes_rec = 0;
584 errno = ENOMEM;
585 return 0;
586 }
587
588 // No peers and hence no incoming packets.
589 if (status == PGM_IO_STATUS_WOULD_BLOCK) {
590 zmq_assert (nbytes_rec == 0);
591
592 // In case if no RDATA/ODATA caused POLLIN 0 is returned.
593 nbytes_rec = 0;
594 errno = EAGAIN;
595 return 0;
596 }
597
598 // Data loss.
599 if (status == PGM_IO_STATUS_RESET) {
600 struct pgm_sk_buff_t *skb = pgm_msgv[0].msgv_skb[0];
601
602 // Save lost data TSI.
603 *tsi_ = &skb->tsi;
604 nbytes_rec = 0;
605
606 // In case of dala loss -1 is returned.
607 errno = EINVAL;
608 pgm_free_skb (skb);
609 return -1;
610 }
611
612 zmq_assert (status == PGM_IO_STATUS_NORMAL);
613 } else {
614 zmq_assert (pgm_msgv_processed <= pgm_msgv_len);
615 }
616
617 // Zero byte payloads are valid in PGM, but not 0MQ protocol.
618 zmq_assert (nbytes_rec > 0);
619
620 // Only one APDU per pgm_msgv_t structure is allowed.
621 zmq_assert (pgm_msgv[pgm_msgv_processed].msgv_len == 1);
622
623 struct pgm_sk_buff_t *skb = pgm_msgv[pgm_msgv_processed].msgv_skb[0];
624
625 // Take pointers from pgm_msgv_t structure.
626 *raw_data_ = skb->data;
627 raw_data_len = skb->len;
628
629 // Save current TSI.
630 *tsi_ = &skb->tsi;
631
632 // Move the the next pgm_msgv_t structure.
633 pgm_msgv_processed++;
634 zmq_assert (pgm_msgv_processed <= pgm_msgv_len);
635 nbytes_processed += raw_data_len;
636
637 return raw_data_len;
638}
639
640void zmq::pgm_socket_t::process_upstream ()
641{
642 pgm_msgv_t dummy_msg;
643
644 size_t dummy_bytes = 0;
645 pgm_error_t *pgm_error = NULL;
646
647 const int status = pgm_recvmsgv (sock, &dummy_msg, 1, MSG_ERRQUEUE,
648 &dummy_bytes, &pgm_error);
649
650 // Invalid parameters.
651 zmq_assert (status != PGM_IO_STATUS_ERROR);
652
653 // No data should be returned.
654 zmq_assert (dummy_bytes == 0
655 && (status == PGM_IO_STATUS_TIMER_PENDING
656 || status == PGM_IO_STATUS_RATE_LIMITED
657 || status == PGM_IO_STATUS_WOULD_BLOCK));
658
659 last_rx_status = status;
660
661 if (status == PGM_IO_STATUS_TIMER_PENDING)
662 errno = EBUSY;
663 else if (status == PGM_IO_STATUS_RATE_LIMITED)
664 errno = ENOMEM;
665 else
666 errno = EAGAIN;
667}
668
669int zmq::pgm_socket_t::compute_sqns (int tpdu_)
670{
671 // Convert rate into B/ms.
672 uint64_t rate = uint64_t (options.rate) / 8;
673
674 // Compute the size of the buffer in bytes.
675 uint64_t size = uint64_t (options.recovery_ivl) * rate;
676
677 // Translate the size into number of packets.
678 uint64_t sqns = size / tpdu_;
679
680 // Buffer should be able to hold at least one packet.
681 if (sqns == 0)
682 sqns = 1;
683
684 return (int) sqns;
685}
686
687#endif
688