1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include "macros.hpp"
32
33#if defined ZMQ_HAVE_OPENPGM
34
35#include <new>
36
37#include "pgm_receiver.hpp"
38#include "session_base.hpp"
39#include "v1_decoder.hpp"
40#include "stdint.hpp"
41#include "wire.hpp"
42#include "err.hpp"
43
44zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
45 const options_t &options_) :
46 io_object_t (parent_),
47 has_rx_timer (false),
48 pgm_socket (true, options_),
49 options (options_),
50 session (NULL),
51 active_tsi (NULL),
52 insize (0)
53{
54}
55
56zmq::pgm_receiver_t::~pgm_receiver_t ()
57{
58 // Destructor should not be called before unplug.
59 zmq_assert (peers.empty ());
60}
61
62int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
63{
64 return pgm_socket.init (udp_encapsulation_, network_);
65}
66
67void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_,
68 session_base_t *session_)
69{
70 LIBZMQ_UNUSED (io_thread_);
71 // Retrieve PGM fds and start polling.
72 fd_t socket_fd = retired_fd;
73 fd_t waiting_pipe_fd = retired_fd;
74 pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd);
75 socket_handle = add_fd (socket_fd);
76 pipe_handle = add_fd (waiting_pipe_fd);
77 set_pollin (pipe_handle);
78 set_pollin (socket_handle);
79
80 session = session_;
81
82 // If there are any subscriptions already queued in the session, drop them.
83 drop_subscriptions ();
84}
85
86void zmq::pgm_receiver_t::unplug ()
87{
88 // Delete decoders.
89 for (peers_t::iterator it = peers.begin (), end = peers.end (); it != end;
90 ++it) {
91 if (it->second.decoder != NULL) {
92 LIBZMQ_DELETE (it->second.decoder);
93 }
94 }
95 peers.clear ();
96 active_tsi = NULL;
97
98 if (has_rx_timer) {
99 cancel_timer (rx_timer_id);
100 has_rx_timer = false;
101 }
102
103 rm_fd (socket_handle);
104 rm_fd (pipe_handle);
105
106 session = NULL;
107}
108
109void zmq::pgm_receiver_t::terminate ()
110{
111 unplug ();
112 delete this;
113}
114
115void zmq::pgm_receiver_t::restart_output ()
116{
117 drop_subscriptions ();
118}
119
120bool zmq::pgm_receiver_t::restart_input ()
121{
122 zmq_assert (session != NULL);
123 zmq_assert (active_tsi != NULL);
124
125 const peers_t::iterator it = peers.find (*active_tsi);
126 zmq_assert (it != peers.end ());
127 zmq_assert (it->second.joined);
128
129 // Push the pending message into the session.
130 int rc = session->push_msg (it->second.decoder->msg ());
131 errno_assert (rc == 0);
132
133 if (insize > 0) {
134 rc = process_input (it->second.decoder);
135 if (rc == -1) {
136 // HWM reached; we will try later.
137 if (errno == EAGAIN) {
138 session->flush ();
139 return true;
140 }
141 // Data error. Delete message decoder, mark the
142 // peer as not joined and drop remaining data.
143 it->second.joined = false;
144 LIBZMQ_DELETE (it->second.decoder);
145 insize = 0;
146 }
147 }
148
149 // Resume polling.
150 set_pollin (pipe_handle);
151 set_pollin (socket_handle);
152
153 active_tsi = NULL;
154 in_event ();
155
156 return true;
157}
158
159const zmq::endpoint_uri_pair_t &zmq::pgm_receiver_t::get_endpoint () const
160{
161 return _empty_endpoint;
162}
163
164void zmq::pgm_receiver_t::in_event ()
165{
166 // If active_tsi is not null, there is a pending restart_input.
167 // Keep the internal state as is so that restart_input would process the right data
168 if (active_tsi) {
169 return;
170 }
171
172 // Read data from the underlying pgm_socket.
173 const pgm_tsi_t *tsi = NULL;
174
175 if (has_rx_timer) {
176 cancel_timer (rx_timer_id);
177 has_rx_timer = false;
178 }
179
180 // TODO: This loop can effectively block other engines in the same I/O
181 // thread in the case of high load.
182 while (true) {
183 // Get new batch of data.
184 // Note the workaround made not to break strict-aliasing rules.
185 insize = 0;
186 void *tmp = NULL;
187 ssize_t received = pgm_socket.receive (&tmp, &tsi);
188
189 // No data to process. This may happen if the packet received is
190 // neither ODATA nor ODATA.
191 if (received == 0) {
192 if (errno == ENOMEM || errno == EBUSY) {
193 const long timeout = pgm_socket.get_rx_timeout ();
194 add_timer (timeout, rx_timer_id);
195 has_rx_timer = true;
196 }
197 break;
198 }
199
200 // Find the peer based on its TSI.
201 peers_t::iterator it = peers.find (*tsi);
202
203 // Data loss. Delete decoder and mark the peer as disjoint.
204 if (received == -1) {
205 if (it != peers.end ()) {
206 it->second.joined = false;
207 if (it->second.decoder != NULL) {
208 LIBZMQ_DELETE (it->second.decoder);
209 }
210 }
211 break;
212 }
213
214 // New peer. Add it to the list of know but unjoint peers.
215 if (it == peers.end ()) {
216 peer_info_t peer_info = {false, NULL};
217 it = peers.ZMQ_MAP_INSERT_OR_EMPLACE (*tsi, peer_info).first;
218 }
219
220 insize = static_cast<size_t> (received);
221 inpos = (unsigned char *) tmp;
222
223 // Read the offset of the fist message in the current packet.
224 zmq_assert (insize >= sizeof (uint16_t));
225 uint16_t offset = get_uint16 (inpos);
226 inpos += sizeof (uint16_t);
227 insize -= sizeof (uint16_t);
228
229 // Join the stream if needed.
230 if (!it->second.joined) {
231 // There is no beginning of the message in current packet.
232 // Ignore the data.
233 if (offset == 0xffff)
234 continue;
235
236 zmq_assert (offset <= insize);
237 zmq_assert (it->second.decoder == NULL);
238
239 // We have to move data to the beginning of the first message.
240 inpos += offset;
241 insize -= offset;
242
243 // Mark the stream as joined.
244 it->second.joined = true;
245
246 // Create and connect decoder for the peer.
247 it->second.decoder =
248 new (std::nothrow) v1_decoder_t (0, options.maxmsgsize);
249 alloc_assert (it->second.decoder);
250 }
251
252 int rc = process_input (it->second.decoder);
253 if (rc == -1) {
254 if (errno == EAGAIN) {
255 active_tsi = tsi;
256
257 // Stop polling.
258 reset_pollin (pipe_handle);
259 reset_pollin (socket_handle);
260
261 break;
262 }
263
264 it->second.joined = false;
265 LIBZMQ_DELETE (it->second.decoder);
266 insize = 0;
267 }
268 }
269
270 // Flush any messages decoder may have produced.
271 session->flush ();
272}
273
274int zmq::pgm_receiver_t::process_input (v1_decoder_t *decoder)
275{
276 zmq_assert (session != NULL);
277
278 while (insize > 0) {
279 size_t n = 0;
280 int rc = decoder->decode (inpos, insize, n);
281 if (rc == -1)
282 return -1;
283 inpos += n;
284 insize -= n;
285 if (rc == 0)
286 break;
287 rc = session->push_msg (decoder->msg ());
288 if (rc == -1) {
289 errno_assert (errno == EAGAIN);
290 return -1;
291 }
292 }
293 return 0;
294}
295
296
297void zmq::pgm_receiver_t::timer_event (int token)
298{
299 zmq_assert (token == rx_timer_id);
300
301 // Timer cancels on return by poller_base.
302 has_rx_timer = false;
303 in_event ();
304}
305
306void zmq::pgm_receiver_t::drop_subscriptions ()
307{
308 msg_t msg;
309 msg.init ();
310 while (session->pull_msg (&msg) == 0)
311 msg.close ();
312}
313
314#endif
315