1 | /* |
2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #include "precompiled.hpp" |
31 | |
32 | #if defined ZMQ_HAVE_OPENPGM |
33 | |
34 | #include <stdlib.h> |
35 | |
36 | #include "io_thread.hpp" |
37 | #include "pgm_sender.hpp" |
38 | #include "session_base.hpp" |
39 | #include "err.hpp" |
40 | #include "wire.hpp" |
41 | #include "stdint.hpp" |
42 | #include "macros.hpp" |
43 | |
44 | zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, |
45 | const options_t &options_) : |
46 | io_object_t (parent_), |
47 | has_tx_timer (false), |
48 | has_rx_timer (false), |
49 | session (NULL), |
50 | encoder (0), |
51 | more_flag (false), |
52 | pgm_socket (false, options_), |
53 | options (options_), |
54 | handle (static_cast<handle_t> (NULL)), |
55 | uplink_handle (static_cast<handle_t> (NULL)), |
56 | rdata_notify_handle (static_cast<handle_t> (NULL)), |
57 | pending_notify_handle (static_cast<handle_t> (NULL)), |
58 | out_buffer (NULL), |
59 | out_buffer_size (0), |
60 | write_size (0) |
61 | { |
62 | int rc = msg.init (); |
63 | errno_assert (rc == 0); |
64 | } |
65 | |
66 | int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) |
67 | { |
68 | int rc = pgm_socket.init (udp_encapsulation_, network_); |
69 | if (rc != 0) |
70 | return rc; |
71 | |
72 | out_buffer_size = pgm_socket.get_max_tsdu_size (); |
73 | out_buffer = (unsigned char *) malloc (out_buffer_size); |
74 | alloc_assert (out_buffer); |
75 | |
76 | return rc; |
77 | } |
78 | |
79 | void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_) |
80 | { |
81 | LIBZMQ_UNUSED (io_thread_); |
82 | // Allocate 2 fds for PGM socket. |
83 | fd_t downlink_socket_fd = retired_fd; |
84 | fd_t uplink_socket_fd = retired_fd; |
85 | fd_t rdata_notify_fd = retired_fd; |
86 | fd_t pending_notify_fd = retired_fd; |
87 | |
88 | session = session_; |
89 | |
90 | // Fill fds from PGM transport and add them to the poller. |
91 | pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd, |
92 | &rdata_notify_fd, &pending_notify_fd); |
93 | |
94 | handle = add_fd (downlink_socket_fd); |
95 | uplink_handle = add_fd (uplink_socket_fd); |
96 | rdata_notify_handle = add_fd (rdata_notify_fd); |
97 | pending_notify_handle = add_fd (pending_notify_fd); |
98 | |
99 | // Set POLLIN. We wont never want to stop polling for uplink = we never |
100 | // want to stop processing NAKs. |
101 | set_pollin (uplink_handle); |
102 | set_pollin (rdata_notify_handle); |
103 | set_pollin (pending_notify_handle); |
104 | |
105 | // Set POLLOUT for downlink_socket_handle. |
106 | set_pollout (handle); |
107 | } |
108 | |
109 | void zmq::pgm_sender_t::unplug () |
110 | { |
111 | if (has_rx_timer) { |
112 | cancel_timer (rx_timer_id); |
113 | has_rx_timer = false; |
114 | } |
115 | |
116 | if (has_tx_timer) { |
117 | cancel_timer (tx_timer_id); |
118 | has_tx_timer = false; |
119 | } |
120 | |
121 | rm_fd (handle); |
122 | rm_fd (uplink_handle); |
123 | rm_fd (rdata_notify_handle); |
124 | rm_fd (pending_notify_handle); |
125 | session = NULL; |
126 | } |
127 | |
128 | void zmq::pgm_sender_t::terminate () |
129 | { |
130 | unplug (); |
131 | delete this; |
132 | } |
133 | |
134 | void zmq::pgm_sender_t::restart_output () |
135 | { |
136 | set_pollout (handle); |
137 | out_event (); |
138 | } |
139 | |
140 | bool zmq::pgm_sender_t::restart_input () |
141 | { |
142 | zmq_assert (false); |
143 | return true; |
144 | } |
145 | |
146 | const zmq::endpoint_uri_pair_t &zmq::pgm_sender_t::get_endpoint () const |
147 | { |
148 | return _empty_endpoint; |
149 | } |
150 | |
151 | zmq::pgm_sender_t::~pgm_sender_t () |
152 | { |
153 | int rc = msg.close (); |
154 | errno_assert (rc == 0); |
155 | |
156 | if (out_buffer) { |
157 | free (out_buffer); |
158 | out_buffer = NULL; |
159 | } |
160 | } |
161 | |
162 | void zmq::pgm_sender_t::in_event () |
163 | { |
164 | if (has_rx_timer) { |
165 | cancel_timer (rx_timer_id); |
166 | has_rx_timer = false; |
167 | } |
168 | |
169 | // In-event on sender side means NAK or SPMR receiving from some peer. |
170 | pgm_socket.process_upstream (); |
171 | if (errno == ENOMEM || errno == EBUSY) { |
172 | const long timeout = pgm_socket.get_rx_timeout (); |
173 | add_timer (timeout, rx_timer_id); |
174 | has_rx_timer = true; |
175 | } |
176 | } |
177 | |
178 | void zmq::pgm_sender_t::out_event () |
179 | { |
180 | // POLLOUT event from send socket. If write buffer is empty, |
181 | // try to read new data from the encoder. |
182 | if (write_size == 0) { |
183 | // First two bytes (sizeof uint16_t) are used to store message |
184 | // offset in following steps. Note that by passing our buffer to |
185 | // the get data function we prevent it from returning its own buffer. |
186 | unsigned char *bf = out_buffer + sizeof (uint16_t); |
187 | size_t bfsz = out_buffer_size - sizeof (uint16_t); |
188 | uint16_t offset = 0xffff; |
189 | |
190 | size_t bytes = encoder.encode (&bf, bfsz); |
191 | while (bytes < bfsz) { |
192 | if (!more_flag && offset == 0xffff) |
193 | offset = static_cast<uint16_t> (bytes); |
194 | int rc = session->pull_msg (&msg); |
195 | if (rc == -1) |
196 | break; |
197 | more_flag = msg.flags () & msg_t::more; |
198 | encoder.load_msg (&msg); |
199 | bf = out_buffer + sizeof (uint16_t) + bytes; |
200 | bytes += encoder.encode (&bf, bfsz - bytes); |
201 | } |
202 | |
203 | // If there are no data to write stop polling for output. |
204 | if (bytes == 0) { |
205 | reset_pollout (handle); |
206 | return; |
207 | } |
208 | |
209 | write_size = sizeof (uint16_t) + bytes; |
210 | |
211 | // Put offset information in the buffer. |
212 | put_uint16 (out_buffer, offset); |
213 | } |
214 | |
215 | if (has_tx_timer) { |
216 | cancel_timer (tx_timer_id); |
217 | set_pollout (handle); |
218 | has_tx_timer = false; |
219 | } |
220 | |
221 | // Send the data. |
222 | size_t nbytes = pgm_socket.send (out_buffer, write_size); |
223 | |
224 | // We can write either all data or 0 which means rate limit reached. |
225 | if (nbytes == write_size) |
226 | write_size = 0; |
227 | else { |
228 | zmq_assert (nbytes == 0); |
229 | |
230 | if (errno == ENOMEM) { |
231 | // Stop polling handle and wait for tx timeout |
232 | const long timeout = pgm_socket.get_tx_timeout (); |
233 | add_timer (timeout, tx_timer_id); |
234 | reset_pollout (handle); |
235 | has_tx_timer = true; |
236 | } else |
237 | errno_assert (errno == EBUSY); |
238 | } |
239 | } |
240 | |
241 | void zmq::pgm_sender_t::timer_event (int token) |
242 | { |
243 | // Timer cancels on return by poller_base. |
244 | if (token == rx_timer_id) { |
245 | has_rx_timer = false; |
246 | in_event (); |
247 | } else if (token == tx_timer_id) { |
248 | // Restart polling handle and retry sending |
249 | has_tx_timer = false; |
250 | set_pollout (handle); |
251 | out_event (); |
252 | } else |
253 | zmq_assert (false); |
254 | } |
255 | |
256 | #endif |
257 | |