| 1 | /* |
| 2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
| 3 | |
| 4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
| 5 | |
| 6 | libzmq is free software; you can redistribute it and/or modify it under |
| 7 | the terms of the GNU Lesser General Public License (LGPL) as published |
| 8 | by the Free Software Foundation; either version 3 of the License, or |
| 9 | (at your option) any later version. |
| 10 | |
| 11 | As a special exception, the Contributors give you permission to link |
| 12 | this library with independent modules to produce an executable, |
| 13 | regardless of the license terms of these independent modules, and to |
| 14 | copy and distribute the resulting executable under terms of your choice, |
| 15 | provided that you also meet, for each linked independent module, the |
| 16 | terms and conditions of the license of that module. An independent |
| 17 | module is a module which is not derived from or based on this library. |
| 18 | If you modify this library, you must extend this exception to your |
| 19 | version of the library. |
| 20 | |
| 21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
| 22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| 23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
| 24 | License for more details. |
| 25 | |
| 26 | You should have received a copy of the GNU Lesser General Public License |
| 27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
| 28 | */ |
| 29 | |
| 30 | #include "precompiled.hpp" |
| 31 | |
| 32 | #ifdef ZMQ_HAVE_OPENPGM |
| 33 | |
| 34 | #ifdef ZMQ_HAVE_LINUX |
| 35 | #include <poll.h> |
| 36 | #endif |
| 37 | |
| 38 | #include <stdlib.h> |
| 39 | #include <string.h> |
| 40 | #include <string> |
| 41 | |
| 42 | #include "options.hpp" |
| 43 | #include "pgm_socket.hpp" |
| 44 | #include "config.hpp" |
| 45 | #include "err.hpp" |
| 46 | #include "random.hpp" |
| 47 | #include "stdint.hpp" |
| 48 | |
| 49 | #ifndef MSG_ERRQUEUE |
| 50 | #define MSG_ERRQUEUE 0x2000 |
| 51 | #endif |
| 52 | |
| 53 | zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) : |
| 54 | sock (NULL), |
| 55 | options (options_), |
| 56 | receiver (receiver_), |
| 57 | pgm_msgv (NULL), |
| 58 | pgm_msgv_len (0), |
| 59 | nbytes_rec (0), |
| 60 | nbytes_processed (0), |
| 61 | pgm_msgv_processed (0) |
| 62 | { |
| 63 | } |
| 64 | |
| 65 | // Resolve PGM socket address. |
| 66 | // network_ of the form <interface & multicast group decls>:<IP port> |
| 67 | // e.g. eth0;239.192.0.1:7500 |
| 68 | // link-local;224.250.0.1,224.250.0.2;224.250.0.3:8000 |
| 69 | // ;[fe80::1%en0]:7500 |
| 70 | int zmq::pgm_socket_t::init_address (const char *network_, |
| 71 | struct pgm_addrinfo_t **res, |
| 72 | uint16_t *port_number) |
| 73 | { |
| 74 | // Parse port number, start from end for IPv6 |
| 75 | const char *port_delim = strrchr (network_, ':'); |
| 76 | if (!port_delim) { |
| 77 | errno = EINVAL; |
| 78 | return -1; |
| 79 | } |
| 80 | |
| 81 | *port_number = atoi (port_delim + 1); |
| 82 | |
| 83 | char network[256]; |
| 84 | if (port_delim - network_ >= (int) sizeof (network) - 1) { |
| 85 | errno = EINVAL; |
| 86 | return -1; |
| 87 | } |
| 88 | memset (network, '\0', sizeof (network)); |
| 89 | memcpy (network, network_, port_delim - network_); |
| 90 | |
| 91 | pgm_error_t *pgm_error = NULL; |
| 92 | struct pgm_addrinfo_t hints; |
| 93 | |
| 94 | memset (&hints, 0, sizeof (hints)); |
| 95 | hints.ai_family = AF_UNSPEC; |
| 96 | if (!pgm_getaddrinfo (network, NULL, res, &pgm_error)) { |
| 97 | // Invalid parameters don't set pgm_error_t. |
| 98 | zmq_assert (pgm_error != NULL); |
| 99 | if (pgm_error->domain == PGM_ERROR_DOMAIN_IF && |
| 100 | |
| 101 | // NB: cannot catch EAI_BADFLAGS. |
| 102 | (pgm_error->code != PGM_ERROR_SERVICE |
| 103 | && pgm_error->code != PGM_ERROR_SOCKTNOSUPPORT)) { |
| 104 | // User, host, or network configuration or transient error. |
| 105 | pgm_error_free (pgm_error); |
| 106 | errno = EINVAL; |
| 107 | return -1; |
| 108 | } |
| 109 | |
| 110 | // Fatal OpenPGM internal error. |
| 111 | zmq_assert (false); |
| 112 | } |
| 113 | return 0; |
| 114 | } |
| 115 | |
| 116 | // Create, bind and connect PGM socket. |
| 117 | int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) |
| 118 | { |
| 119 | // Can not open transport before destroying old one. |
| 120 | zmq_assert (sock == NULL); |
| 121 | zmq_assert (options.rate > 0); |
| 122 | |
| 123 | // Zero counter used in msgrecv. |
| 124 | nbytes_rec = 0; |
| 125 | nbytes_processed = 0; |
| 126 | pgm_msgv_processed = 0; |
| 127 | |
| 128 | uint16_t port_number; |
| 129 | struct pgm_addrinfo_t *res = NULL; |
| 130 | sa_family_t sa_family; |
| 131 | |
| 132 | pgm_error_t *pgm_error = NULL; |
| 133 | |
| 134 | if (init_address (network_, &res, &port_number) < 0) { |
| 135 | goto err_abort; |
| 136 | } |
| 137 | |
| 138 | zmq_assert (res != NULL); |
| 139 | |
| 140 | // Pick up detected IP family. |
| 141 | sa_family = res->ai_send_addrs[0].gsr_group.ss_family; |
| 142 | |
| 143 | // Create IP/PGM or UDP/PGM socket. |
| 144 | if (udp_encapsulation_) { |
| 145 | if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP, |
| 146 | &pgm_error)) { |
| 147 | // Invalid parameters don't set pgm_error_t. |
| 148 | zmq_assert (pgm_error != NULL); |
| 149 | if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET |
| 150 | && (pgm_error->code != PGM_ERROR_BADF |
| 151 | && pgm_error->code != PGM_ERROR_FAULT |
| 152 | && pgm_error->code != PGM_ERROR_NOPROTOOPT |
| 153 | && pgm_error->code != PGM_ERROR_FAILED)) |
| 154 | |
| 155 | // User, host, or network configuration or transient error. |
| 156 | goto err_abort; |
| 157 | |
| 158 | // Fatal OpenPGM internal error. |
| 159 | zmq_assert (false); |
| 160 | } |
| 161 | |
| 162 | // All options are of data type int |
| 163 | const int encapsulation_port = port_number; |
| 164 | if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, |
| 165 | &encapsulation_port, sizeof (encapsulation_port))) |
| 166 | goto err_abort; |
| 167 | if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT, |
| 168 | &encapsulation_port, sizeof (encapsulation_port))) |
| 169 | goto err_abort; |
| 170 | } else { |
| 171 | if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM, |
| 172 | &pgm_error)) { |
| 173 | // Invalid parameters don't set pgm_error_t. |
| 174 | zmq_assert (pgm_error != NULL); |
| 175 | if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET |
| 176 | && (pgm_error->code != PGM_ERROR_BADF |
| 177 | && pgm_error->code != PGM_ERROR_FAULT |
| 178 | && pgm_error->code != PGM_ERROR_NOPROTOOPT |
| 179 | && pgm_error->code != PGM_ERROR_FAILED)) |
| 180 | |
| 181 | // User, host, or network configuration or transient error. |
| 182 | goto err_abort; |
| 183 | |
| 184 | // Fatal OpenPGM internal error. |
| 185 | zmq_assert (false); |
| 186 | } |
| 187 | } |
| 188 | |
| 189 | { |
| 190 | const int rcvbuf = (int) options.rcvbuf; |
| 191 | if (rcvbuf >= 0) { |
| 192 | if (!pgm_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &rcvbuf, |
| 193 | sizeof (rcvbuf))) |
| 194 | goto err_abort; |
| 195 | } |
| 196 | |
| 197 | const int sndbuf = (int) options.sndbuf; |
| 198 | if (sndbuf >= 0) { |
| 199 | if (!pgm_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &sndbuf, |
| 200 | sizeof (sndbuf))) |
| 201 | goto err_abort; |
| 202 | } |
| 203 | |
| 204 | const int max_tpdu = (int) options.multicast_maxtpdu; |
| 205 | if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MTU, &max_tpdu, |
| 206 | sizeof (max_tpdu))) |
| 207 | goto err_abort; |
| 208 | } |
| 209 | |
| 210 | if (receiver) { |
| 211 | const int recv_only = 1, rxw_max_tpdu = (int) options.multicast_maxtpdu, |
| 212 | rxw_sqns = compute_sqns (rxw_max_tpdu), |
| 213 | peer_expiry = pgm_secs (300), spmr_expiry = pgm_msecs (25), |
| 214 | nak_bo_ivl = pgm_msecs (50), nak_rpt_ivl = pgm_msecs (200), |
| 215 | nak_rdata_ivl = pgm_msecs (200), nak_data_retries = 50, |
| 216 | nak_ncf_retries = 50; |
| 217 | |
| 218 | if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only, |
| 219 | sizeof (recv_only)) |
| 220 | || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SQNS, &rxw_sqns, |
| 221 | sizeof (rxw_sqns)) |
| 222 | || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY, |
| 223 | &peer_expiry, sizeof (peer_expiry)) |
| 224 | || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, |
| 225 | &spmr_expiry, sizeof (spmr_expiry)) |
| 226 | || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl, |
| 227 | sizeof (nak_bo_ivl)) |
| 228 | || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RPT_IVL, |
| 229 | &nak_rpt_ivl, sizeof (nak_rpt_ivl)) |
| 230 | || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RDATA_IVL, |
| 231 | &nak_rdata_ivl, sizeof (nak_rdata_ivl)) |
| 232 | || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_DATA_RETRIES, |
| 233 | &nak_data_retries, sizeof (nak_data_retries)) |
| 234 | || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_NCF_RETRIES, |
| 235 | &nak_ncf_retries, sizeof (nak_ncf_retries))) |
| 236 | goto err_abort; |
| 237 | } else { |
| 238 | const int send_only = 1, max_rte = (int) ((options.rate * 1000) / 8), |
| 239 | txw_max_tpdu = (int) options.multicast_maxtpdu, |
| 240 | txw_sqns = compute_sqns (txw_max_tpdu), |
| 241 | ambient_spm = pgm_secs (30), |
| 242 | heartbeat_spm[] = { |
| 243 | pgm_msecs (100), pgm_msecs (100), pgm_msecs (100), |
| 244 | pgm_msecs (100), pgm_msecs (1300), pgm_secs (7), |
| 245 | pgm_secs (16), pgm_secs (25), pgm_secs (30)}; |
| 246 | |
| 247 | if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_ONLY, &send_only, |
| 248 | sizeof (send_only)) |
| 249 | || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_ODATA_MAX_RTE, &max_rte, |
| 250 | sizeof (max_rte)) |
| 251 | || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SQNS, &txw_sqns, |
| 252 | sizeof (txw_sqns)) |
| 253 | || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM, |
| 254 | &ambient_spm, sizeof (ambient_spm)) |
| 255 | || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM, |
| 256 | &heartbeat_spm, sizeof (heartbeat_spm))) |
| 257 | goto err_abort; |
| 258 | } |
| 259 | |
| 260 | // PGM transport GSI. |
| 261 | struct pgm_sockaddr_t addr; |
| 262 | |
| 263 | memset (&addr, 0, sizeof (addr)); |
| 264 | addr.sa_port = port_number; |
| 265 | addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT; |
| 266 | |
| 267 | // Create random GSI. |
| 268 | uint32_t buf[2]; |
| 269 | buf[0] = generate_random (); |
| 270 | buf[1] = generate_random (); |
| 271 | if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, (uint8_t *) buf, 8)) |
| 272 | goto err_abort; |
| 273 | |
| 274 | |
| 275 | // Bind a transport to the specified network devices. |
| 276 | struct pgm_interface_req_t if_req; |
| 277 | memset (&if_req, 0, sizeof (if_req)); |
| 278 | if_req.ir_interface = res->ai_recv_addrs[0].gsr_interface; |
| 279 | if_req.ir_scope_id = 0; |
| 280 | if (AF_INET6 == sa_family) { |
| 281 | struct sockaddr_in6 sa6; |
| 282 | memcpy (&sa6, &res->ai_recv_addrs[0].gsr_group, sizeof (sa6)); |
| 283 | if_req.ir_scope_id = sa6.sin6_scope_id; |
| 284 | } |
| 285 | if (!pgm_bind3 (sock, &addr, sizeof (addr), &if_req, sizeof (if_req), |
| 286 | &if_req, sizeof (if_req), &pgm_error)) { |
| 287 | // Invalid parameters don't set pgm_error_t. |
| 288 | zmq_assert (pgm_error != NULL); |
| 289 | if ((pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET |
| 290 | || pgm_error->domain == PGM_ERROR_DOMAIN_IF) |
| 291 | && (pgm_error->code != PGM_ERROR_INVAL |
| 292 | && pgm_error->code != PGM_ERROR_BADF |
| 293 | && pgm_error->code != PGM_ERROR_FAULT)) |
| 294 | |
| 295 | // User, host, or network configuration or transient error. |
| 296 | goto err_abort; |
| 297 | |
| 298 | // Fatal OpenPGM internal error. |
| 299 | zmq_assert (false); |
| 300 | } |
| 301 | |
| 302 | // Join IP multicast groups. |
| 303 | for (unsigned i = 0; i < res->ai_recv_addrs_len; i++) { |
| 304 | if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_JOIN_GROUP, |
| 305 | &res->ai_recv_addrs[i], sizeof (struct group_req))) |
| 306 | goto err_abort; |
| 307 | } |
| 308 | if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_GROUP, |
| 309 | &res->ai_send_addrs[0], sizeof (struct group_req))) |
| 310 | goto err_abort; |
| 311 | |
| 312 | pgm_freeaddrinfo (res); |
| 313 | res = NULL; |
| 314 | |
| 315 | // Set IP level parameters. |
| 316 | { |
| 317 | // Multicast loopback disabled by default |
| 318 | const int multicast_loop = 0; |
| 319 | if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_LOOP, |
| 320 | &multicast_loop, sizeof (multicast_loop))) |
| 321 | goto err_abort; |
| 322 | |
| 323 | const int multicast_hops = options.multicast_hops; |
| 324 | if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS, |
| 325 | &multicast_hops, sizeof (multicast_hops))) |
| 326 | goto err_abort; |
| 327 | |
| 328 | // Expedited Forwarding PHB for network elements, no ECN. |
| 329 | // Ignore return value due to varied runtime support. |
| 330 | const int dscp = 0x2e << 2; |
| 331 | if (AF_INET6 != sa_family) |
| 332 | pgm_setsockopt (sock, IPPROTO_PGM, PGM_TOS, &dscp, sizeof (dscp)); |
| 333 | |
| 334 | const int nonblocking = 1; |
| 335 | if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK, &nonblocking, |
| 336 | sizeof (nonblocking))) |
| 337 | goto err_abort; |
| 338 | } |
| 339 | |
| 340 | // Connect PGM transport to start state machine. |
| 341 | if (!pgm_connect (sock, &pgm_error)) { |
| 342 | // Invalid parameters don't set pgm_error_t. |
| 343 | zmq_assert (pgm_error != NULL); |
| 344 | goto err_abort; |
| 345 | } |
| 346 | |
| 347 | // For receiver transport preallocate pgm_msgv array. |
| 348 | if (receiver) { |
| 349 | zmq_assert (options.in_batch_size > 0); |
| 350 | size_t max_tsdu_size = get_max_tsdu_size (); |
| 351 | pgm_msgv_len = (int) options.in_batch_size / max_tsdu_size; |
| 352 | if ((int) options.in_batch_size % max_tsdu_size) |
| 353 | pgm_msgv_len++; |
| 354 | zmq_assert (pgm_msgv_len); |
| 355 | |
| 356 | pgm_msgv = (pgm_msgv_t *) malloc (sizeof (pgm_msgv_t) * pgm_msgv_len); |
| 357 | alloc_assert (pgm_msgv); |
| 358 | } |
| 359 | |
| 360 | return 0; |
| 361 | |
| 362 | err_abort: |
| 363 | if (sock != NULL) { |
| 364 | pgm_close (sock, FALSE); |
| 365 | sock = NULL; |
| 366 | } |
| 367 | if (res != NULL) { |
| 368 | pgm_freeaddrinfo (res); |
| 369 | res = NULL; |
| 370 | } |
| 371 | if (pgm_error != NULL) { |
| 372 | pgm_error_free (pgm_error); |
| 373 | pgm_error = NULL; |
| 374 | } |
| 375 | errno = EINVAL; |
| 376 | return -1; |
| 377 | } |
| 378 | |
| 379 | zmq::pgm_socket_t::~pgm_socket_t () |
| 380 | { |
| 381 | if (pgm_msgv) |
| 382 | free (pgm_msgv); |
| 383 | if (sock) |
| 384 | pgm_close (sock, TRUE); |
| 385 | } |
| 386 | |
| 387 | // Get receiver fds. receive_fd_ is signaled for incoming packets, |
| 388 | // waiting_pipe_fd_ is signaled for state driven events and data. |
| 389 | void zmq::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_, |
| 390 | fd_t *waiting_pipe_fd_) |
| 391 | { |
| 392 | socklen_t socklen; |
| 393 | bool rc; |
| 394 | |
| 395 | zmq_assert (receive_fd_); |
| 396 | zmq_assert (waiting_pipe_fd_); |
| 397 | |
| 398 | socklen = sizeof (*receive_fd_); |
| 399 | rc = |
| 400 | pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_, &socklen); |
| 401 | zmq_assert (rc); |
| 402 | zmq_assert (socklen == sizeof (*receive_fd_)); |
| 403 | |
| 404 | socklen = sizeof (*waiting_pipe_fd_); |
| 405 | rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK, waiting_pipe_fd_, |
| 406 | &socklen); |
| 407 | zmq_assert (rc); |
| 408 | zmq_assert (socklen == sizeof (*waiting_pipe_fd_)); |
| 409 | } |
| 410 | |
| 411 | // Get fds and store them into user allocated memory. |
| 412 | // send_fd is for non-blocking send wire notifications. |
| 413 | // receive_fd_ is for incoming back-channel protocol packets. |
| 414 | // rdata_notify_fd_ is raised for waiting repair transmissions. |
| 415 | // pending_notify_fd_ is for state driven events. |
| 416 | void zmq::pgm_socket_t::get_sender_fds (fd_t *send_fd_, |
| 417 | fd_t *receive_fd_, |
| 418 | fd_t *rdata_notify_fd_, |
| 419 | fd_t *pending_notify_fd_) |
| 420 | { |
| 421 | socklen_t socklen; |
| 422 | bool rc; |
| 423 | |
| 424 | zmq_assert (send_fd_); |
| 425 | zmq_assert (receive_fd_); |
| 426 | zmq_assert (rdata_notify_fd_); |
| 427 | zmq_assert (pending_notify_fd_); |
| 428 | |
| 429 | socklen = sizeof (*send_fd_); |
| 430 | rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_SEND_SOCK, send_fd_, &socklen); |
| 431 | zmq_assert (rc); |
| 432 | zmq_assert (socklen == sizeof (*receive_fd_)); |
| 433 | |
| 434 | socklen = sizeof (*receive_fd_); |
| 435 | rc = |
| 436 | pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_, &socklen); |
| 437 | zmq_assert (rc); |
| 438 | zmq_assert (socklen == sizeof (*receive_fd_)); |
| 439 | |
| 440 | socklen = sizeof (*rdata_notify_fd_); |
| 441 | rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_REPAIR_SOCK, rdata_notify_fd_, |
| 442 | &socklen); |
| 443 | zmq_assert (rc); |
| 444 | zmq_assert (socklen == sizeof (*rdata_notify_fd_)); |
| 445 | |
| 446 | socklen = sizeof (*pending_notify_fd_); |
| 447 | rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK, |
| 448 | pending_notify_fd_, &socklen); |
| 449 | zmq_assert (rc); |
| 450 | zmq_assert (socklen == sizeof (*pending_notify_fd_)); |
| 451 | } |
| 452 | |
| 453 | // Send one APDU, transmit window owned memory. |
| 454 | // data_len_ must be less than one TPDU. |
| 455 | size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) |
| 456 | { |
| 457 | size_t nbytes = 0; |
| 458 | |
| 459 | const int status = pgm_send (sock, data_, data_len_, &nbytes); |
| 460 | |
| 461 | // We have to write all data as one packet. |
| 462 | if (nbytes > 0) { |
| 463 | zmq_assert (status == PGM_IO_STATUS_NORMAL); |
| 464 | zmq_assert (nbytes == data_len_); |
| 465 | } else { |
| 466 | zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED |
| 467 | || status == PGM_IO_STATUS_WOULD_BLOCK); |
| 468 | |
| 469 | if (status == PGM_IO_STATUS_RATE_LIMITED) |
| 470 | errno = ENOMEM; |
| 471 | else |
| 472 | errno = EBUSY; |
| 473 | } |
| 474 | |
| 475 | // Save return value. |
| 476 | last_tx_status = status; |
| 477 | |
| 478 | return nbytes; |
| 479 | } |
| 480 | |
| 481 | long zmq::pgm_socket_t::get_rx_timeout () |
| 482 | { |
| 483 | if (last_rx_status != PGM_IO_STATUS_RATE_LIMITED |
| 484 | && last_rx_status != PGM_IO_STATUS_TIMER_PENDING) |
| 485 | return -1; |
| 486 | |
| 487 | struct timeval tv; |
| 488 | socklen_t optlen = sizeof (tv); |
| 489 | const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, |
| 490 | last_rx_status == PGM_IO_STATUS_RATE_LIMITED |
| 491 | ? PGM_RATE_REMAIN |
| 492 | : PGM_TIME_REMAIN, |
| 493 | &tv, &optlen); |
| 494 | zmq_assert (rc); |
| 495 | |
| 496 | const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); |
| 497 | |
| 498 | return timeout; |
| 499 | } |
| 500 | |
| 501 | long zmq::pgm_socket_t::get_tx_timeout () |
| 502 | { |
| 503 | if (last_tx_status != PGM_IO_STATUS_RATE_LIMITED) |
| 504 | return -1; |
| 505 | |
| 506 | struct timeval tv; |
| 507 | socklen_t optlen = sizeof (tv); |
| 508 | const bool rc = |
| 509 | pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &optlen); |
| 510 | zmq_assert (rc); |
| 511 | |
| 512 | const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); |
| 513 | |
| 514 | return timeout; |
| 515 | } |
| 516 | |
| 517 | // Return max TSDU size without fragmentation from current PGM transport. |
| 518 | size_t zmq::pgm_socket_t::get_max_tsdu_size () |
| 519 | { |
| 520 | int max_tsdu = 0; |
| 521 | socklen_t optlen = sizeof (max_tsdu); |
| 522 | |
| 523 | bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_MSS, &max_tsdu, &optlen); |
| 524 | zmq_assert (rc); |
| 525 | zmq_assert (optlen == sizeof (max_tsdu)); |
| 526 | return (size_t) max_tsdu; |
| 527 | } |
| 528 | |
| 529 | // pgm_recvmsgv is called to fill the pgm_msgv array up to pgm_msgv_len. |
| 530 | // In subsequent calls data from pgm_msgv structure are returned. |
| 531 | ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) |
| 532 | { |
| 533 | size_t raw_data_len = 0; |
| 534 | |
| 535 | // We just sent all data from pgm_transport_recvmsgv up |
| 536 | // and have to return 0 that another engine in this thread is scheduled. |
| 537 | if (nbytes_rec == nbytes_processed && nbytes_rec > 0) { |
| 538 | // Reset all the counters. |
| 539 | nbytes_rec = 0; |
| 540 | nbytes_processed = 0; |
| 541 | pgm_msgv_processed = 0; |
| 542 | errno = EAGAIN; |
| 543 | return 0; |
| 544 | } |
| 545 | |
| 546 | // If we have are going first time or if we have processed all pgm_msgv_t |
| 547 | // structure previously read from the pgm socket. |
| 548 | if (nbytes_rec == nbytes_processed) { |
| 549 | // Check program flow. |
| 550 | zmq_assert (pgm_msgv_processed == 0); |
| 551 | zmq_assert (nbytes_processed == 0); |
| 552 | zmq_assert (nbytes_rec == 0); |
| 553 | |
| 554 | // Receive a vector of Application Protocol Domain Unit's (APDUs) |
| 555 | // from the transport. |
| 556 | pgm_error_t *pgm_error = NULL; |
| 557 | |
| 558 | const int status = pgm_recvmsgv (sock, pgm_msgv, pgm_msgv_len, |
| 559 | MSG_ERRQUEUE, &nbytes_rec, &pgm_error); |
| 560 | |
| 561 | // Invalid parameters. |
| 562 | zmq_assert (status != PGM_IO_STATUS_ERROR); |
| 563 | |
| 564 | last_rx_status = status; |
| 565 | |
| 566 | // In a case when no ODATA/RDATA fired POLLIN event (SPM...) |
| 567 | // pgm_recvmsg returns PGM_IO_STATUS_TIMER_PENDING. |
| 568 | if (status == PGM_IO_STATUS_TIMER_PENDING) { |
| 569 | zmq_assert (nbytes_rec == 0); |
| 570 | |
| 571 | // In case if no RDATA/ODATA caused POLLIN 0 is |
| 572 | // returned. |
| 573 | nbytes_rec = 0; |
| 574 | errno = EBUSY; |
| 575 | return 0; |
| 576 | } |
| 577 | |
| 578 | // Send SPMR, NAK, ACK is rate limited. |
| 579 | if (status == PGM_IO_STATUS_RATE_LIMITED) { |
| 580 | zmq_assert (nbytes_rec == 0); |
| 581 | |
| 582 | // In case if no RDATA/ODATA caused POLLIN 0 is returned. |
| 583 | nbytes_rec = 0; |
| 584 | errno = ENOMEM; |
| 585 | return 0; |
| 586 | } |
| 587 | |
| 588 | // No peers and hence no incoming packets. |
| 589 | if (status == PGM_IO_STATUS_WOULD_BLOCK) { |
| 590 | zmq_assert (nbytes_rec == 0); |
| 591 | |
| 592 | // In case if no RDATA/ODATA caused POLLIN 0 is returned. |
| 593 | nbytes_rec = 0; |
| 594 | errno = EAGAIN; |
| 595 | return 0; |
| 596 | } |
| 597 | |
| 598 | // Data loss. |
| 599 | if (status == PGM_IO_STATUS_RESET) { |
| 600 | struct pgm_sk_buff_t *skb = pgm_msgv[0].msgv_skb[0]; |
| 601 | |
| 602 | // Save lost data TSI. |
| 603 | *tsi_ = &skb->tsi; |
| 604 | nbytes_rec = 0; |
| 605 | |
| 606 | // In case of dala loss -1 is returned. |
| 607 | errno = EINVAL; |
| 608 | pgm_free_skb (skb); |
| 609 | return -1; |
| 610 | } |
| 611 | |
| 612 | zmq_assert (status == PGM_IO_STATUS_NORMAL); |
| 613 | } else { |
| 614 | zmq_assert (pgm_msgv_processed <= pgm_msgv_len); |
| 615 | } |
| 616 | |
| 617 | // Zero byte payloads are valid in PGM, but not 0MQ protocol. |
| 618 | zmq_assert (nbytes_rec > 0); |
| 619 | |
| 620 | // Only one APDU per pgm_msgv_t structure is allowed. |
| 621 | zmq_assert (pgm_msgv[pgm_msgv_processed].msgv_len == 1); |
| 622 | |
| 623 | struct pgm_sk_buff_t *skb = pgm_msgv[pgm_msgv_processed].msgv_skb[0]; |
| 624 | |
| 625 | // Take pointers from pgm_msgv_t structure. |
| 626 | *raw_data_ = skb->data; |
| 627 | raw_data_len = skb->len; |
| 628 | |
| 629 | // Save current TSI. |
| 630 | *tsi_ = &skb->tsi; |
| 631 | |
| 632 | // Move the the next pgm_msgv_t structure. |
| 633 | pgm_msgv_processed++; |
| 634 | zmq_assert (pgm_msgv_processed <= pgm_msgv_len); |
| 635 | nbytes_processed += raw_data_len; |
| 636 | |
| 637 | return raw_data_len; |
| 638 | } |
| 639 | |
| 640 | void zmq::pgm_socket_t::process_upstream () |
| 641 | { |
| 642 | pgm_msgv_t dummy_msg; |
| 643 | |
| 644 | size_t dummy_bytes = 0; |
| 645 | pgm_error_t *pgm_error = NULL; |
| 646 | |
| 647 | const int status = pgm_recvmsgv (sock, &dummy_msg, 1, MSG_ERRQUEUE, |
| 648 | &dummy_bytes, &pgm_error); |
| 649 | |
| 650 | // Invalid parameters. |
| 651 | zmq_assert (status != PGM_IO_STATUS_ERROR); |
| 652 | |
| 653 | // No data should be returned. |
| 654 | zmq_assert (dummy_bytes == 0 |
| 655 | && (status == PGM_IO_STATUS_TIMER_PENDING |
| 656 | || status == PGM_IO_STATUS_RATE_LIMITED |
| 657 | || status == PGM_IO_STATUS_WOULD_BLOCK)); |
| 658 | |
| 659 | last_rx_status = status; |
| 660 | |
| 661 | if (status == PGM_IO_STATUS_TIMER_PENDING) |
| 662 | errno = EBUSY; |
| 663 | else if (status == PGM_IO_STATUS_RATE_LIMITED) |
| 664 | errno = ENOMEM; |
| 665 | else |
| 666 | errno = EAGAIN; |
| 667 | } |
| 668 | |
| 669 | int zmq::pgm_socket_t::compute_sqns (int tpdu_) |
| 670 | { |
| 671 | // Convert rate into B/ms. |
| 672 | uint64_t rate = uint64_t (options.rate) / 8; |
| 673 | |
| 674 | // Compute the size of the buffer in bytes. |
| 675 | uint64_t size = uint64_t (options.recovery_ivl) * rate; |
| 676 | |
| 677 | // Translate the size into number of packets. |
| 678 | uint64_t sqns = size / tpdu_; |
| 679 | |
| 680 | // Buffer should be able to hold at least one packet. |
| 681 | if (sqns == 0) |
| 682 | sqns = 1; |
| 683 | |
| 684 | return (int) sqns; |
| 685 | } |
| 686 | |
| 687 | #endif |
| 688 | |