1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31
32#if defined ZMQ_HAVE_OPENPGM
33
34#include <stdlib.h>
35
36#include "io_thread.hpp"
37#include "pgm_sender.hpp"
38#include "session_base.hpp"
39#include "err.hpp"
40#include "wire.hpp"
41#include "stdint.hpp"
42#include "macros.hpp"
43
44zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
45 const options_t &options_) :
46 io_object_t (parent_),
47 has_tx_timer (false),
48 has_rx_timer (false),
49 session (NULL),
50 encoder (0),
51 more_flag (false),
52 pgm_socket (false, options_),
53 options (options_),
54 handle (static_cast<handle_t> (NULL)),
55 uplink_handle (static_cast<handle_t> (NULL)),
56 rdata_notify_handle (static_cast<handle_t> (NULL)),
57 pending_notify_handle (static_cast<handle_t> (NULL)),
58 out_buffer (NULL),
59 out_buffer_size (0),
60 write_size (0)
61{
62 int rc = msg.init ();
63 errno_assert (rc == 0);
64}
65
66int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
67{
68 int rc = pgm_socket.init (udp_encapsulation_, network_);
69 if (rc != 0)
70 return rc;
71
72 out_buffer_size = pgm_socket.get_max_tsdu_size ();
73 out_buffer = (unsigned char *) malloc (out_buffer_size);
74 alloc_assert (out_buffer);
75
76 return rc;
77}
78
79void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
80{
81 LIBZMQ_UNUSED (io_thread_);
82 // Allocate 2 fds for PGM socket.
83 fd_t downlink_socket_fd = retired_fd;
84 fd_t uplink_socket_fd = retired_fd;
85 fd_t rdata_notify_fd = retired_fd;
86 fd_t pending_notify_fd = retired_fd;
87
88 session = session_;
89
90 // Fill fds from PGM transport and add them to the poller.
91 pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd,
92 &rdata_notify_fd, &pending_notify_fd);
93
94 handle = add_fd (downlink_socket_fd);
95 uplink_handle = add_fd (uplink_socket_fd);
96 rdata_notify_handle = add_fd (rdata_notify_fd);
97 pending_notify_handle = add_fd (pending_notify_fd);
98
99 // Set POLLIN. We wont never want to stop polling for uplink = we never
100 // want to stop processing NAKs.
101 set_pollin (uplink_handle);
102 set_pollin (rdata_notify_handle);
103 set_pollin (pending_notify_handle);
104
105 // Set POLLOUT for downlink_socket_handle.
106 set_pollout (handle);
107}
108
109void zmq::pgm_sender_t::unplug ()
110{
111 if (has_rx_timer) {
112 cancel_timer (rx_timer_id);
113 has_rx_timer = false;
114 }
115
116 if (has_tx_timer) {
117 cancel_timer (tx_timer_id);
118 has_tx_timer = false;
119 }
120
121 rm_fd (handle);
122 rm_fd (uplink_handle);
123 rm_fd (rdata_notify_handle);
124 rm_fd (pending_notify_handle);
125 session = NULL;
126}
127
128void zmq::pgm_sender_t::terminate ()
129{
130 unplug ();
131 delete this;
132}
133
134void zmq::pgm_sender_t::restart_output ()
135{
136 set_pollout (handle);
137 out_event ();
138}
139
140bool zmq::pgm_sender_t::restart_input ()
141{
142 zmq_assert (false);
143 return true;
144}
145
146const zmq::endpoint_uri_pair_t &zmq::pgm_sender_t::get_endpoint () const
147{
148 return _empty_endpoint;
149}
150
151zmq::pgm_sender_t::~pgm_sender_t ()
152{
153 int rc = msg.close ();
154 errno_assert (rc == 0);
155
156 if (out_buffer) {
157 free (out_buffer);
158 out_buffer = NULL;
159 }
160}
161
162void zmq::pgm_sender_t::in_event ()
163{
164 if (has_rx_timer) {
165 cancel_timer (rx_timer_id);
166 has_rx_timer = false;
167 }
168
169 // In-event on sender side means NAK or SPMR receiving from some peer.
170 pgm_socket.process_upstream ();
171 if (errno == ENOMEM || errno == EBUSY) {
172 const long timeout = pgm_socket.get_rx_timeout ();
173 add_timer (timeout, rx_timer_id);
174 has_rx_timer = true;
175 }
176}
177
178void zmq::pgm_sender_t::out_event ()
179{
180 // POLLOUT event from send socket. If write buffer is empty,
181 // try to read new data from the encoder.
182 if (write_size == 0) {
183 // First two bytes (sizeof uint16_t) are used to store message
184 // offset in following steps. Note that by passing our buffer to
185 // the get data function we prevent it from returning its own buffer.
186 unsigned char *bf = out_buffer + sizeof (uint16_t);
187 size_t bfsz = out_buffer_size - sizeof (uint16_t);
188 uint16_t offset = 0xffff;
189
190 size_t bytes = encoder.encode (&bf, bfsz);
191 while (bytes < bfsz) {
192 if (!more_flag && offset == 0xffff)
193 offset = static_cast<uint16_t> (bytes);
194 int rc = session->pull_msg (&msg);
195 if (rc == -1)
196 break;
197 more_flag = msg.flags () & msg_t::more;
198 encoder.load_msg (&msg);
199 bf = out_buffer + sizeof (uint16_t) + bytes;
200 bytes += encoder.encode (&bf, bfsz - bytes);
201 }
202
203 // If there are no data to write stop polling for output.
204 if (bytes == 0) {
205 reset_pollout (handle);
206 return;
207 }
208
209 write_size = sizeof (uint16_t) + bytes;
210
211 // Put offset information in the buffer.
212 put_uint16 (out_buffer, offset);
213 }
214
215 if (has_tx_timer) {
216 cancel_timer (tx_timer_id);
217 set_pollout (handle);
218 has_tx_timer = false;
219 }
220
221 // Send the data.
222 size_t nbytes = pgm_socket.send (out_buffer, write_size);
223
224 // We can write either all data or 0 which means rate limit reached.
225 if (nbytes == write_size)
226 write_size = 0;
227 else {
228 zmq_assert (nbytes == 0);
229
230 if (errno == ENOMEM) {
231 // Stop polling handle and wait for tx timeout
232 const long timeout = pgm_socket.get_tx_timeout ();
233 add_timer (timeout, tx_timer_id);
234 reset_pollout (handle);
235 has_tx_timer = true;
236 } else
237 errno_assert (errno == EBUSY);
238 }
239}
240
241void zmq::pgm_sender_t::timer_event (int token)
242{
243 // Timer cancels on return by poller_base.
244 if (token == rx_timer_id) {
245 has_rx_timer = false;
246 in_event ();
247 } else if (token == tx_timer_id) {
248 // Restart polling handle and retry sending
249 has_tx_timer = false;
250 set_pollout (handle);
251 out_event ();
252 } else
253 zmq_assert (false);
254}
255
256#endif
257