1 | /* |
2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #include "precompiled.hpp" |
31 | |
32 | #ifdef ZMQ_HAVE_OPENPGM |
33 | |
34 | #ifdef ZMQ_HAVE_LINUX |
35 | #include <poll.h> |
36 | #endif |
37 | |
38 | #include <stdlib.h> |
39 | #include <string.h> |
40 | #include <string> |
41 | |
42 | #include "options.hpp" |
43 | #include "pgm_socket.hpp" |
44 | #include "config.hpp" |
45 | #include "err.hpp" |
46 | #include "random.hpp" |
47 | #include "stdint.hpp" |
48 | |
49 | #ifndef MSG_ERRQUEUE |
50 | #define MSG_ERRQUEUE 0x2000 |
51 | #endif |
52 | |
53 | zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) : |
54 | sock (NULL), |
55 | options (options_), |
56 | receiver (receiver_), |
57 | pgm_msgv (NULL), |
58 | pgm_msgv_len (0), |
59 | nbytes_rec (0), |
60 | nbytes_processed (0), |
61 | pgm_msgv_processed (0) |
62 | { |
63 | } |
64 | |
65 | // Resolve PGM socket address. |
66 | // network_ of the form <interface & multicast group decls>:<IP port> |
67 | // e.g. eth0;239.192.0.1:7500 |
68 | // link-local;224.250.0.1,224.250.0.2;224.250.0.3:8000 |
69 | // ;[fe80::1%en0]:7500 |
70 | int zmq::pgm_socket_t::init_address (const char *network_, |
71 | struct pgm_addrinfo_t **res, |
72 | uint16_t *port_number) |
73 | { |
74 | // Parse port number, start from end for IPv6 |
75 | const char *port_delim = strrchr (network_, ':'); |
76 | if (!port_delim) { |
77 | errno = EINVAL; |
78 | return -1; |
79 | } |
80 | |
81 | *port_number = atoi (port_delim + 1); |
82 | |
83 | char network[256]; |
84 | if (port_delim - network_ >= (int) sizeof (network) - 1) { |
85 | errno = EINVAL; |
86 | return -1; |
87 | } |
88 | memset (network, '\0', sizeof (network)); |
89 | memcpy (network, network_, port_delim - network_); |
90 | |
91 | pgm_error_t *pgm_error = NULL; |
92 | struct pgm_addrinfo_t hints; |
93 | |
94 | memset (&hints, 0, sizeof (hints)); |
95 | hints.ai_family = AF_UNSPEC; |
96 | if (!pgm_getaddrinfo (network, NULL, res, &pgm_error)) { |
97 | // Invalid parameters don't set pgm_error_t. |
98 | zmq_assert (pgm_error != NULL); |
99 | if (pgm_error->domain == PGM_ERROR_DOMAIN_IF && |
100 | |
101 | // NB: cannot catch EAI_BADFLAGS. |
102 | (pgm_error->code != PGM_ERROR_SERVICE |
103 | && pgm_error->code != PGM_ERROR_SOCKTNOSUPPORT)) { |
104 | // User, host, or network configuration or transient error. |
105 | pgm_error_free (pgm_error); |
106 | errno = EINVAL; |
107 | return -1; |
108 | } |
109 | |
110 | // Fatal OpenPGM internal error. |
111 | zmq_assert (false); |
112 | } |
113 | return 0; |
114 | } |
115 | |
116 | // Create, bind and connect PGM socket. |
117 | int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) |
118 | { |
119 | // Can not open transport before destroying old one. |
120 | zmq_assert (sock == NULL); |
121 | zmq_assert (options.rate > 0); |
122 | |
123 | // Zero counter used in msgrecv. |
124 | nbytes_rec = 0; |
125 | nbytes_processed = 0; |
126 | pgm_msgv_processed = 0; |
127 | |
128 | uint16_t port_number; |
129 | struct pgm_addrinfo_t *res = NULL; |
130 | sa_family_t sa_family; |
131 | |
132 | pgm_error_t *pgm_error = NULL; |
133 | |
134 | if (init_address (network_, &res, &port_number) < 0) { |
135 | goto err_abort; |
136 | } |
137 | |
138 | zmq_assert (res != NULL); |
139 | |
140 | // Pick up detected IP family. |
141 | sa_family = res->ai_send_addrs[0].gsr_group.ss_family; |
142 | |
143 | // Create IP/PGM or UDP/PGM socket. |
144 | if (udp_encapsulation_) { |
145 | if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP, |
146 | &pgm_error)) { |
147 | // Invalid parameters don't set pgm_error_t. |
148 | zmq_assert (pgm_error != NULL); |
149 | if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET |
150 | && (pgm_error->code != PGM_ERROR_BADF |
151 | && pgm_error->code != PGM_ERROR_FAULT |
152 | && pgm_error->code != PGM_ERROR_NOPROTOOPT |
153 | && pgm_error->code != PGM_ERROR_FAILED)) |
154 | |
155 | // User, host, or network configuration or transient error. |
156 | goto err_abort; |
157 | |
158 | // Fatal OpenPGM internal error. |
159 | zmq_assert (false); |
160 | } |
161 | |
162 | // All options are of data type int |
163 | const int encapsulation_port = port_number; |
164 | if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, |
165 | &encapsulation_port, sizeof (encapsulation_port))) |
166 | goto err_abort; |
167 | if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT, |
168 | &encapsulation_port, sizeof (encapsulation_port))) |
169 | goto err_abort; |
170 | } else { |
171 | if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM, |
172 | &pgm_error)) { |
173 | // Invalid parameters don't set pgm_error_t. |
174 | zmq_assert (pgm_error != NULL); |
175 | if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET |
176 | && (pgm_error->code != PGM_ERROR_BADF |
177 | && pgm_error->code != PGM_ERROR_FAULT |
178 | && pgm_error->code != PGM_ERROR_NOPROTOOPT |
179 | && pgm_error->code != PGM_ERROR_FAILED)) |
180 | |
181 | // User, host, or network configuration or transient error. |
182 | goto err_abort; |
183 | |
184 | // Fatal OpenPGM internal error. |
185 | zmq_assert (false); |
186 | } |
187 | } |
188 | |
189 | { |
190 | const int rcvbuf = (int) options.rcvbuf; |
191 | if (rcvbuf >= 0) { |
192 | if (!pgm_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &rcvbuf, |
193 | sizeof (rcvbuf))) |
194 | goto err_abort; |
195 | } |
196 | |
197 | const int sndbuf = (int) options.sndbuf; |
198 | if (sndbuf >= 0) { |
199 | if (!pgm_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &sndbuf, |
200 | sizeof (sndbuf))) |
201 | goto err_abort; |
202 | } |
203 | |
204 | const int max_tpdu = (int) options.multicast_maxtpdu; |
205 | if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MTU, &max_tpdu, |
206 | sizeof (max_tpdu))) |
207 | goto err_abort; |
208 | } |
209 | |
210 | if (receiver) { |
211 | const int recv_only = 1, rxw_max_tpdu = (int) options.multicast_maxtpdu, |
212 | rxw_sqns = compute_sqns (rxw_max_tpdu), |
213 | peer_expiry = pgm_secs (300), spmr_expiry = pgm_msecs (25), |
214 | nak_bo_ivl = pgm_msecs (50), nak_rpt_ivl = pgm_msecs (200), |
215 | nak_rdata_ivl = pgm_msecs (200), nak_data_retries = 50, |
216 | nak_ncf_retries = 50; |
217 | |
218 | if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only, |
219 | sizeof (recv_only)) |
220 | || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SQNS, &rxw_sqns, |
221 | sizeof (rxw_sqns)) |
222 | || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY, |
223 | &peer_expiry, sizeof (peer_expiry)) |
224 | || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, |
225 | &spmr_expiry, sizeof (spmr_expiry)) |
226 | || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl, |
227 | sizeof (nak_bo_ivl)) |
228 | || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RPT_IVL, |
229 | &nak_rpt_ivl, sizeof (nak_rpt_ivl)) |
230 | || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RDATA_IVL, |
231 | &nak_rdata_ivl, sizeof (nak_rdata_ivl)) |
232 | || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_DATA_RETRIES, |
233 | &nak_data_retries, sizeof (nak_data_retries)) |
234 | || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_NCF_RETRIES, |
235 | &nak_ncf_retries, sizeof (nak_ncf_retries))) |
236 | goto err_abort; |
237 | } else { |
238 | const int send_only = 1, max_rte = (int) ((options.rate * 1000) / 8), |
239 | txw_max_tpdu = (int) options.multicast_maxtpdu, |
240 | txw_sqns = compute_sqns (txw_max_tpdu), |
241 | ambient_spm = pgm_secs (30), |
242 | heartbeat_spm[] = { |
243 | pgm_msecs (100), pgm_msecs (100), pgm_msecs (100), |
244 | pgm_msecs (100), pgm_msecs (1300), pgm_secs (7), |
245 | pgm_secs (16), pgm_secs (25), pgm_secs (30)}; |
246 | |
247 | if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_ONLY, &send_only, |
248 | sizeof (send_only)) |
249 | || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_ODATA_MAX_RTE, &max_rte, |
250 | sizeof (max_rte)) |
251 | || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SQNS, &txw_sqns, |
252 | sizeof (txw_sqns)) |
253 | || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM, |
254 | &ambient_spm, sizeof (ambient_spm)) |
255 | || !pgm_setsockopt (sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM, |
256 | &heartbeat_spm, sizeof (heartbeat_spm))) |
257 | goto err_abort; |
258 | } |
259 | |
260 | // PGM transport GSI. |
261 | struct pgm_sockaddr_t addr; |
262 | |
263 | memset (&addr, 0, sizeof (addr)); |
264 | addr.sa_port = port_number; |
265 | addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT; |
266 | |
267 | // Create random GSI. |
268 | uint32_t buf[2]; |
269 | buf[0] = generate_random (); |
270 | buf[1] = generate_random (); |
271 | if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, (uint8_t *) buf, 8)) |
272 | goto err_abort; |
273 | |
274 | |
275 | // Bind a transport to the specified network devices. |
276 | struct pgm_interface_req_t if_req; |
277 | memset (&if_req, 0, sizeof (if_req)); |
278 | if_req.ir_interface = res->ai_recv_addrs[0].gsr_interface; |
279 | if_req.ir_scope_id = 0; |
280 | if (AF_INET6 == sa_family) { |
281 | struct sockaddr_in6 sa6; |
282 | memcpy (&sa6, &res->ai_recv_addrs[0].gsr_group, sizeof (sa6)); |
283 | if_req.ir_scope_id = sa6.sin6_scope_id; |
284 | } |
285 | if (!pgm_bind3 (sock, &addr, sizeof (addr), &if_req, sizeof (if_req), |
286 | &if_req, sizeof (if_req), &pgm_error)) { |
287 | // Invalid parameters don't set pgm_error_t. |
288 | zmq_assert (pgm_error != NULL); |
289 | if ((pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET |
290 | || pgm_error->domain == PGM_ERROR_DOMAIN_IF) |
291 | && (pgm_error->code != PGM_ERROR_INVAL |
292 | && pgm_error->code != PGM_ERROR_BADF |
293 | && pgm_error->code != PGM_ERROR_FAULT)) |
294 | |
295 | // User, host, or network configuration or transient error. |
296 | goto err_abort; |
297 | |
298 | // Fatal OpenPGM internal error. |
299 | zmq_assert (false); |
300 | } |
301 | |
302 | // Join IP multicast groups. |
303 | for (unsigned i = 0; i < res->ai_recv_addrs_len; i++) { |
304 | if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_JOIN_GROUP, |
305 | &res->ai_recv_addrs[i], sizeof (struct group_req))) |
306 | goto err_abort; |
307 | } |
308 | if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_GROUP, |
309 | &res->ai_send_addrs[0], sizeof (struct group_req))) |
310 | goto err_abort; |
311 | |
312 | pgm_freeaddrinfo (res); |
313 | res = NULL; |
314 | |
315 | // Set IP level parameters. |
316 | { |
317 | // Multicast loopback disabled by default |
318 | const int multicast_loop = 0; |
319 | if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_LOOP, |
320 | &multicast_loop, sizeof (multicast_loop))) |
321 | goto err_abort; |
322 | |
323 | const int multicast_hops = options.multicast_hops; |
324 | if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS, |
325 | &multicast_hops, sizeof (multicast_hops))) |
326 | goto err_abort; |
327 | |
328 | // Expedited Forwarding PHB for network elements, no ECN. |
329 | // Ignore return value due to varied runtime support. |
330 | const int dscp = 0x2e << 2; |
331 | if (AF_INET6 != sa_family) |
332 | pgm_setsockopt (sock, IPPROTO_PGM, PGM_TOS, &dscp, sizeof (dscp)); |
333 | |
334 | const int nonblocking = 1; |
335 | if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK, &nonblocking, |
336 | sizeof (nonblocking))) |
337 | goto err_abort; |
338 | } |
339 | |
340 | // Connect PGM transport to start state machine. |
341 | if (!pgm_connect (sock, &pgm_error)) { |
342 | // Invalid parameters don't set pgm_error_t. |
343 | zmq_assert (pgm_error != NULL); |
344 | goto err_abort; |
345 | } |
346 | |
347 | // For receiver transport preallocate pgm_msgv array. |
348 | if (receiver) { |
349 | zmq_assert (options.in_batch_size > 0); |
350 | size_t max_tsdu_size = get_max_tsdu_size (); |
351 | pgm_msgv_len = (int) options.in_batch_size / max_tsdu_size; |
352 | if ((int) options.in_batch_size % max_tsdu_size) |
353 | pgm_msgv_len++; |
354 | zmq_assert (pgm_msgv_len); |
355 | |
356 | pgm_msgv = (pgm_msgv_t *) malloc (sizeof (pgm_msgv_t) * pgm_msgv_len); |
357 | alloc_assert (pgm_msgv); |
358 | } |
359 | |
360 | return 0; |
361 | |
362 | err_abort: |
363 | if (sock != NULL) { |
364 | pgm_close (sock, FALSE); |
365 | sock = NULL; |
366 | } |
367 | if (res != NULL) { |
368 | pgm_freeaddrinfo (res); |
369 | res = NULL; |
370 | } |
371 | if (pgm_error != NULL) { |
372 | pgm_error_free (pgm_error); |
373 | pgm_error = NULL; |
374 | } |
375 | errno = EINVAL; |
376 | return -1; |
377 | } |
378 | |
379 | zmq::pgm_socket_t::~pgm_socket_t () |
380 | { |
381 | if (pgm_msgv) |
382 | free (pgm_msgv); |
383 | if (sock) |
384 | pgm_close (sock, TRUE); |
385 | } |
386 | |
387 | // Get receiver fds. receive_fd_ is signaled for incoming packets, |
388 | // waiting_pipe_fd_ is signaled for state driven events and data. |
389 | void zmq::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_, |
390 | fd_t *waiting_pipe_fd_) |
391 | { |
392 | socklen_t socklen; |
393 | bool rc; |
394 | |
395 | zmq_assert (receive_fd_); |
396 | zmq_assert (waiting_pipe_fd_); |
397 | |
398 | socklen = sizeof (*receive_fd_); |
399 | rc = |
400 | pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_, &socklen); |
401 | zmq_assert (rc); |
402 | zmq_assert (socklen == sizeof (*receive_fd_)); |
403 | |
404 | socklen = sizeof (*waiting_pipe_fd_); |
405 | rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK, waiting_pipe_fd_, |
406 | &socklen); |
407 | zmq_assert (rc); |
408 | zmq_assert (socklen == sizeof (*waiting_pipe_fd_)); |
409 | } |
410 | |
411 | // Get fds and store them into user allocated memory. |
412 | // send_fd is for non-blocking send wire notifications. |
413 | // receive_fd_ is for incoming back-channel protocol packets. |
414 | // rdata_notify_fd_ is raised for waiting repair transmissions. |
415 | // pending_notify_fd_ is for state driven events. |
416 | void zmq::pgm_socket_t::get_sender_fds (fd_t *send_fd_, |
417 | fd_t *receive_fd_, |
418 | fd_t *rdata_notify_fd_, |
419 | fd_t *pending_notify_fd_) |
420 | { |
421 | socklen_t socklen; |
422 | bool rc; |
423 | |
424 | zmq_assert (send_fd_); |
425 | zmq_assert (receive_fd_); |
426 | zmq_assert (rdata_notify_fd_); |
427 | zmq_assert (pending_notify_fd_); |
428 | |
429 | socklen = sizeof (*send_fd_); |
430 | rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_SEND_SOCK, send_fd_, &socklen); |
431 | zmq_assert (rc); |
432 | zmq_assert (socklen == sizeof (*receive_fd_)); |
433 | |
434 | socklen = sizeof (*receive_fd_); |
435 | rc = |
436 | pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_, &socklen); |
437 | zmq_assert (rc); |
438 | zmq_assert (socklen == sizeof (*receive_fd_)); |
439 | |
440 | socklen = sizeof (*rdata_notify_fd_); |
441 | rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_REPAIR_SOCK, rdata_notify_fd_, |
442 | &socklen); |
443 | zmq_assert (rc); |
444 | zmq_assert (socklen == sizeof (*rdata_notify_fd_)); |
445 | |
446 | socklen = sizeof (*pending_notify_fd_); |
447 | rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK, |
448 | pending_notify_fd_, &socklen); |
449 | zmq_assert (rc); |
450 | zmq_assert (socklen == sizeof (*pending_notify_fd_)); |
451 | } |
452 | |
453 | // Send one APDU, transmit window owned memory. |
454 | // data_len_ must be less than one TPDU. |
455 | size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) |
456 | { |
457 | size_t nbytes = 0; |
458 | |
459 | const int status = pgm_send (sock, data_, data_len_, &nbytes); |
460 | |
461 | // We have to write all data as one packet. |
462 | if (nbytes > 0) { |
463 | zmq_assert (status == PGM_IO_STATUS_NORMAL); |
464 | zmq_assert (nbytes == data_len_); |
465 | } else { |
466 | zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED |
467 | || status == PGM_IO_STATUS_WOULD_BLOCK); |
468 | |
469 | if (status == PGM_IO_STATUS_RATE_LIMITED) |
470 | errno = ENOMEM; |
471 | else |
472 | errno = EBUSY; |
473 | } |
474 | |
475 | // Save return value. |
476 | last_tx_status = status; |
477 | |
478 | return nbytes; |
479 | } |
480 | |
481 | long zmq::pgm_socket_t::get_rx_timeout () |
482 | { |
483 | if (last_rx_status != PGM_IO_STATUS_RATE_LIMITED |
484 | && last_rx_status != PGM_IO_STATUS_TIMER_PENDING) |
485 | return -1; |
486 | |
487 | struct timeval tv; |
488 | socklen_t optlen = sizeof (tv); |
489 | const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, |
490 | last_rx_status == PGM_IO_STATUS_RATE_LIMITED |
491 | ? PGM_RATE_REMAIN |
492 | : PGM_TIME_REMAIN, |
493 | &tv, &optlen); |
494 | zmq_assert (rc); |
495 | |
496 | const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); |
497 | |
498 | return timeout; |
499 | } |
500 | |
501 | long zmq::pgm_socket_t::get_tx_timeout () |
502 | { |
503 | if (last_tx_status != PGM_IO_STATUS_RATE_LIMITED) |
504 | return -1; |
505 | |
506 | struct timeval tv; |
507 | socklen_t optlen = sizeof (tv); |
508 | const bool rc = |
509 | pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &optlen); |
510 | zmq_assert (rc); |
511 | |
512 | const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); |
513 | |
514 | return timeout; |
515 | } |
516 | |
517 | // Return max TSDU size without fragmentation from current PGM transport. |
518 | size_t zmq::pgm_socket_t::get_max_tsdu_size () |
519 | { |
520 | int max_tsdu = 0; |
521 | socklen_t optlen = sizeof (max_tsdu); |
522 | |
523 | bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_MSS, &max_tsdu, &optlen); |
524 | zmq_assert (rc); |
525 | zmq_assert (optlen == sizeof (max_tsdu)); |
526 | return (size_t) max_tsdu; |
527 | } |
528 | |
529 | // pgm_recvmsgv is called to fill the pgm_msgv array up to pgm_msgv_len. |
530 | // In subsequent calls data from pgm_msgv structure are returned. |
531 | ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) |
532 | { |
533 | size_t raw_data_len = 0; |
534 | |
535 | // We just sent all data from pgm_transport_recvmsgv up |
536 | // and have to return 0 that another engine in this thread is scheduled. |
537 | if (nbytes_rec == nbytes_processed && nbytes_rec > 0) { |
538 | // Reset all the counters. |
539 | nbytes_rec = 0; |
540 | nbytes_processed = 0; |
541 | pgm_msgv_processed = 0; |
542 | errno = EAGAIN; |
543 | return 0; |
544 | } |
545 | |
546 | // If we have are going first time or if we have processed all pgm_msgv_t |
547 | // structure previously read from the pgm socket. |
548 | if (nbytes_rec == nbytes_processed) { |
549 | // Check program flow. |
550 | zmq_assert (pgm_msgv_processed == 0); |
551 | zmq_assert (nbytes_processed == 0); |
552 | zmq_assert (nbytes_rec == 0); |
553 | |
554 | // Receive a vector of Application Protocol Domain Unit's (APDUs) |
555 | // from the transport. |
556 | pgm_error_t *pgm_error = NULL; |
557 | |
558 | const int status = pgm_recvmsgv (sock, pgm_msgv, pgm_msgv_len, |
559 | MSG_ERRQUEUE, &nbytes_rec, &pgm_error); |
560 | |
561 | // Invalid parameters. |
562 | zmq_assert (status != PGM_IO_STATUS_ERROR); |
563 | |
564 | last_rx_status = status; |
565 | |
566 | // In a case when no ODATA/RDATA fired POLLIN event (SPM...) |
567 | // pgm_recvmsg returns PGM_IO_STATUS_TIMER_PENDING. |
568 | if (status == PGM_IO_STATUS_TIMER_PENDING) { |
569 | zmq_assert (nbytes_rec == 0); |
570 | |
571 | // In case if no RDATA/ODATA caused POLLIN 0 is |
572 | // returned. |
573 | nbytes_rec = 0; |
574 | errno = EBUSY; |
575 | return 0; |
576 | } |
577 | |
578 | // Send SPMR, NAK, ACK is rate limited. |
579 | if (status == PGM_IO_STATUS_RATE_LIMITED) { |
580 | zmq_assert (nbytes_rec == 0); |
581 | |
582 | // In case if no RDATA/ODATA caused POLLIN 0 is returned. |
583 | nbytes_rec = 0; |
584 | errno = ENOMEM; |
585 | return 0; |
586 | } |
587 | |
588 | // No peers and hence no incoming packets. |
589 | if (status == PGM_IO_STATUS_WOULD_BLOCK) { |
590 | zmq_assert (nbytes_rec == 0); |
591 | |
592 | // In case if no RDATA/ODATA caused POLLIN 0 is returned. |
593 | nbytes_rec = 0; |
594 | errno = EAGAIN; |
595 | return 0; |
596 | } |
597 | |
598 | // Data loss. |
599 | if (status == PGM_IO_STATUS_RESET) { |
600 | struct pgm_sk_buff_t *skb = pgm_msgv[0].msgv_skb[0]; |
601 | |
602 | // Save lost data TSI. |
603 | *tsi_ = &skb->tsi; |
604 | nbytes_rec = 0; |
605 | |
606 | // In case of dala loss -1 is returned. |
607 | errno = EINVAL; |
608 | pgm_free_skb (skb); |
609 | return -1; |
610 | } |
611 | |
612 | zmq_assert (status == PGM_IO_STATUS_NORMAL); |
613 | } else { |
614 | zmq_assert (pgm_msgv_processed <= pgm_msgv_len); |
615 | } |
616 | |
617 | // Zero byte payloads are valid in PGM, but not 0MQ protocol. |
618 | zmq_assert (nbytes_rec > 0); |
619 | |
620 | // Only one APDU per pgm_msgv_t structure is allowed. |
621 | zmq_assert (pgm_msgv[pgm_msgv_processed].msgv_len == 1); |
622 | |
623 | struct pgm_sk_buff_t *skb = pgm_msgv[pgm_msgv_processed].msgv_skb[0]; |
624 | |
625 | // Take pointers from pgm_msgv_t structure. |
626 | *raw_data_ = skb->data; |
627 | raw_data_len = skb->len; |
628 | |
629 | // Save current TSI. |
630 | *tsi_ = &skb->tsi; |
631 | |
632 | // Move the the next pgm_msgv_t structure. |
633 | pgm_msgv_processed++; |
634 | zmq_assert (pgm_msgv_processed <= pgm_msgv_len); |
635 | nbytes_processed += raw_data_len; |
636 | |
637 | return raw_data_len; |
638 | } |
639 | |
640 | void zmq::pgm_socket_t::process_upstream () |
641 | { |
642 | pgm_msgv_t dummy_msg; |
643 | |
644 | size_t dummy_bytes = 0; |
645 | pgm_error_t *pgm_error = NULL; |
646 | |
647 | const int status = pgm_recvmsgv (sock, &dummy_msg, 1, MSG_ERRQUEUE, |
648 | &dummy_bytes, &pgm_error); |
649 | |
650 | // Invalid parameters. |
651 | zmq_assert (status != PGM_IO_STATUS_ERROR); |
652 | |
653 | // No data should be returned. |
654 | zmq_assert (dummy_bytes == 0 |
655 | && (status == PGM_IO_STATUS_TIMER_PENDING |
656 | || status == PGM_IO_STATUS_RATE_LIMITED |
657 | || status == PGM_IO_STATUS_WOULD_BLOCK)); |
658 | |
659 | last_rx_status = status; |
660 | |
661 | if (status == PGM_IO_STATUS_TIMER_PENDING) |
662 | errno = EBUSY; |
663 | else if (status == PGM_IO_STATUS_RATE_LIMITED) |
664 | errno = ENOMEM; |
665 | else |
666 | errno = EAGAIN; |
667 | } |
668 | |
669 | int zmq::pgm_socket_t::compute_sqns (int tpdu_) |
670 | { |
671 | // Convert rate into B/ms. |
672 | uint64_t rate = uint64_t (options.rate) / 8; |
673 | |
674 | // Compute the size of the buffer in bytes. |
675 | uint64_t size = uint64_t (options.recovery_ivl) * rate; |
676 | |
677 | // Translate the size into number of packets. |
678 | uint64_t sqns = size / tpdu_; |
679 | |
680 | // Buffer should be able to hold at least one packet. |
681 | if (sqns == 0) |
682 | sqns = 1; |
683 | |
684 | return (int) sqns; |
685 | } |
686 | |
687 | #endif |
688 | |