1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include "macros.hpp"
32#include "stream.hpp"
33#include "pipe.hpp"
34#include "wire.hpp"
35#include "random.hpp"
36#include "likely.hpp"
37#include "err.hpp"
38
39zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
40 routing_socket_base_t (parent_, tid_, sid_),
41 _prefetched (false),
42 _routing_id_sent (false),
43 _current_out (NULL),
44 _more_out (false),
45 _next_integral_routing_id (generate_random ())
46{
47 options.type = ZMQ_STREAM;
48 options.raw_socket = true;
49
50 _prefetched_routing_id.init ();
51 _prefetched_msg.init ();
52}
53
54zmq::stream_t::~stream_t ()
55{
56 _prefetched_routing_id.close ();
57 _prefetched_msg.close ();
58}
59
60void zmq::stream_t::xattach_pipe (pipe_t *pipe_,
61 bool subscribe_to_all_,
62 bool locally_initiated_)
63{
64 LIBZMQ_UNUSED (subscribe_to_all_);
65
66 zmq_assert (pipe_);
67
68 identify_peer (pipe_, locally_initiated_);
69 _fq.attach (pipe_);
70}
71
72void zmq::stream_t::xpipe_terminated (pipe_t *pipe_)
73{
74 erase_out_pipe (pipe_);
75 _fq.pipe_terminated (pipe_);
76 // TODO router_t calls pipe_->rollback() here; should this be done here as
77 // well? then xpipe_terminated could be pulled up to routing_socket_base_t
78 if (pipe_ == _current_out)
79 _current_out = NULL;
80}
81
82void zmq::stream_t::xread_activated (pipe_t *pipe_)
83{
84 _fq.activated (pipe_);
85}
86
87int zmq::stream_t::xsend (msg_t *msg_)
88{
89 // If this is the first part of the message it's the ID of the
90 // peer to send the message to.
91 if (!_more_out) {
92 zmq_assert (!_current_out);
93
94 // If we have malformed message (prefix with no subsequent message)
95 // then just silently ignore it.
96 // TODO: The connections should be killed instead.
97 if (msg_->flags () & msg_t::more) {
98 // Find the pipe associated with the routing id stored in the prefix.
99 // If there's no such pipe return an error
100
101 out_pipe_t *out_pipe = lookup_out_pipe (
102 blob_t (static_cast<unsigned char *> (msg_->data ()),
103 msg_->size (), reference_tag_t ()));
104
105 if (out_pipe) {
106 _current_out = out_pipe->pipe;
107 if (!_current_out->check_write ()) {
108 out_pipe->active = false;
109 _current_out = NULL;
110 errno = EAGAIN;
111 return -1;
112 }
113 } else {
114 errno = EHOSTUNREACH;
115 return -1;
116 }
117 }
118
119 // Expect one more message frame.
120 _more_out = true;
121
122 int rc = msg_->close ();
123 errno_assert (rc == 0);
124 rc = msg_->init ();
125 errno_assert (rc == 0);
126 return 0;
127 }
128
129 // Ignore the MORE flag
130 msg_->reset_flags (msg_t::more);
131
132 // This is the last part of the message.
133 _more_out = false;
134
135 // Push the message into the pipe. If there's no out pipe, just drop it.
136 if (_current_out) {
137 // Close the remote connection if user has asked to do so
138 // by sending zero length message.
139 // Pending messages in the pipe will be dropped (on receiving term- ack)
140 if (msg_->size () == 0) {
141 _current_out->terminate (false);
142 int rc = msg_->close ();
143 errno_assert (rc == 0);
144 rc = msg_->init ();
145 errno_assert (rc == 0);
146 _current_out = NULL;
147 return 0;
148 }
149 bool ok = _current_out->write (msg_);
150 if (likely (ok))
151 _current_out->flush ();
152 _current_out = NULL;
153 } else {
154 int rc = msg_->close ();
155 errno_assert (rc == 0);
156 }
157
158 // Detach the message from the data buffer.
159 int rc = msg_->init ();
160 errno_assert (rc == 0);
161
162 return 0;
163}
164
165int zmq::stream_t::xsetsockopt (int option_,
166 const void *optval_,
167 size_t optvallen_)
168{
169 switch (option_) {
170 case ZMQ_STREAM_NOTIFY:
171 return do_setsockopt_int_as_bool_strict (optval_, optvallen_,
172 &options.raw_notify);
173
174 default:
175 return routing_socket_base_t::xsetsockopt (option_, optval_,
176 optvallen_);
177 }
178}
179
180int zmq::stream_t::xrecv (msg_t *msg_)
181{
182 if (_prefetched) {
183 if (!_routing_id_sent) {
184 int rc = msg_->move (_prefetched_routing_id);
185 errno_assert (rc == 0);
186 _routing_id_sent = true;
187 } else {
188 int rc = msg_->move (_prefetched_msg);
189 errno_assert (rc == 0);
190 _prefetched = false;
191 }
192 return 0;
193 }
194
195 pipe_t *pipe = NULL;
196 int rc = _fq.recvpipe (&_prefetched_msg, &pipe);
197 if (rc != 0)
198 return -1;
199
200 zmq_assert (pipe != NULL);
201 zmq_assert ((_prefetched_msg.flags () & msg_t::more) == 0);
202
203 // We have received a frame with TCP data.
204 // Rather than sending this frame, we keep it in prefetched
205 // buffer and send a frame with peer's ID.
206 const blob_t &routing_id = pipe->get_routing_id ();
207 rc = msg_->close ();
208 errno_assert (rc == 0);
209 rc = msg_->init_size (routing_id.size ());
210 errno_assert (rc == 0);
211
212 // forward metadata (if any)
213 metadata_t *metadata = _prefetched_msg.metadata ();
214 if (metadata)
215 msg_->set_metadata (metadata);
216
217 memcpy (msg_->data (), routing_id.data (), routing_id.size ());
218 msg_->set_flags (msg_t::more);
219
220 _prefetched = true;
221 _routing_id_sent = true;
222
223 return 0;
224}
225
226bool zmq::stream_t::xhas_in ()
227{
228 // We may already have a message pre-fetched.
229 if (_prefetched)
230 return true;
231
232 // Try to read the next message.
233 // The message, if read, is kept in the pre-fetch buffer.
234 pipe_t *pipe = NULL;
235 int rc = _fq.recvpipe (&_prefetched_msg, &pipe);
236 if (rc != 0)
237 return false;
238
239 zmq_assert (pipe != NULL);
240 zmq_assert ((_prefetched_msg.flags () & msg_t::more) == 0);
241
242 const blob_t &routing_id = pipe->get_routing_id ();
243 rc = _prefetched_routing_id.init_size (routing_id.size ());
244 errno_assert (rc == 0);
245
246 // forward metadata (if any)
247 metadata_t *metadata = _prefetched_msg.metadata ();
248 if (metadata)
249 _prefetched_routing_id.set_metadata (metadata);
250
251 memcpy (_prefetched_routing_id.data (), routing_id.data (),
252 routing_id.size ());
253 _prefetched_routing_id.set_flags (msg_t::more);
254
255 _prefetched = true;
256 _routing_id_sent = false;
257
258 return true;
259}
260
261bool zmq::stream_t::xhas_out ()
262{
263 // In theory, STREAM socket is always ready for writing. Whether actual
264 // attempt to write succeeds depends on which pipe the message is going
265 // to be routed to.
266 return true;
267}
268
269void zmq::stream_t::identify_peer (pipe_t *pipe_, bool locally_initiated_)
270{
271 // Always assign routing id for raw-socket
272 unsigned char buffer[5];
273 buffer[0] = 0;
274 blob_t routing_id;
275 if (locally_initiated_ && connect_routing_id_is_set ()) {
276 const std::string connect_routing_id = extract_connect_routing_id ();
277 routing_id.set (
278 reinterpret_cast<const unsigned char *> (connect_routing_id.c_str ()),
279 connect_routing_id.length ());
280 // Not allowed to duplicate an existing rid
281 zmq_assert (!has_out_pipe (routing_id));
282 } else {
283 put_uint32 (buffer + 1, _next_integral_routing_id++);
284 routing_id.set (buffer, sizeof buffer);
285 memcpy (options.routing_id, routing_id.data (), routing_id.size ());
286 options.routing_id_size =
287 static_cast<unsigned char> (routing_id.size ());
288 }
289 pipe_->set_router_socket_routing_id (routing_id);
290 add_out_pipe (ZMQ_MOVE (routing_id), pipe_);
291}
292