1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include "macros.hpp"
32#include "req.hpp"
33#include "err.hpp"
34#include "msg.hpp"
35#include "wire.hpp"
36#include "random.hpp"
37#include "likely.hpp"
38
39zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
40 dealer_t (parent_, tid_, sid_),
41 _receiving_reply (false),
42 _message_begins (true),
43 _reply_pipe (NULL),
44 _request_id_frames_enabled (false),
45 _request_id (generate_random ()),
46 _strict (true)
47{
48 options.type = ZMQ_REQ;
49}
50
51zmq::req_t::~req_t ()
52{
53}
54
55int zmq::req_t::xsend (msg_t *msg_)
56{
57 // If we've sent a request and we still haven't got the reply,
58 // we can't send another request unless the strict option is disabled.
59 if (_receiving_reply) {
60 if (_strict) {
61 errno = EFSM;
62 return -1;
63 }
64
65 _receiving_reply = false;
66 _message_begins = true;
67 }
68
69 // First part of the request is the request routing id.
70 if (_message_begins) {
71 _reply_pipe = NULL;
72
73 if (_request_id_frames_enabled) {
74 _request_id++;
75
76 msg_t id;
77 int rc = id.init_size (sizeof (uint32_t));
78 memcpy (id.data (), &_request_id, sizeof (uint32_t));
79 errno_assert (rc == 0);
80 id.set_flags (msg_t::more);
81
82 rc = dealer_t::sendpipe (&id, &_reply_pipe);
83 if (rc != 0) {
84 return -1;
85 }
86 }
87
88 msg_t bottom;
89 int rc = bottom.init ();
90 errno_assert (rc == 0);
91 bottom.set_flags (msg_t::more);
92
93 rc = dealer_t::sendpipe (&bottom, &_reply_pipe);
94 if (rc != 0)
95 return -1;
96 zmq_assert (_reply_pipe);
97
98 _message_begins = false;
99
100 // Eat all currently available messages before the request is fully
101 // sent. This is done to avoid:
102 // REQ sends request to A, A replies, B replies too.
103 // A's reply was first and matches, that is used.
104 // An hour later REQ sends a request to B. B's old reply is used.
105 msg_t drop;
106 while (true) {
107 rc = drop.init ();
108 errno_assert (rc == 0);
109 rc = dealer_t::xrecv (&drop);
110 if (rc != 0)
111 break;
112 drop.close ();
113 }
114 }
115
116 bool more = (msg_->flags () & msg_t::more) != 0;
117
118 int rc = dealer_t::xsend (msg_);
119 if (rc != 0)
120 return rc;
121
122 // If the request was fully sent, flip the FSM into reply-receiving state.
123 if (!more) {
124 _receiving_reply = true;
125 _message_begins = true;
126 }
127
128 return 0;
129}
130
131int zmq::req_t::xrecv (msg_t *msg_)
132{
133 // If request wasn't send, we can't wait for reply.
134 if (!_receiving_reply) {
135 errno = EFSM;
136 return -1;
137 }
138
139 // Skip messages until one with the right first frames is found.
140 while (_message_begins) {
141 // If enabled, the first frame must have the correct request_id.
142 if (_request_id_frames_enabled) {
143 int rc = recv_reply_pipe (msg_);
144 if (rc != 0)
145 return rc;
146
147 if (unlikely (!(msg_->flags () & msg_t::more)
148 || msg_->size () != sizeof (_request_id)
149 || *static_cast<uint32_t *> (msg_->data ())
150 != _request_id)) {
151 // Skip the remaining frames and try the next message
152 while (msg_->flags () & msg_t::more) {
153 rc = recv_reply_pipe (msg_);
154 errno_assert (rc == 0);
155 }
156 continue;
157 }
158 }
159
160 // The next frame must be 0.
161 // TODO: Failing this check should also close the connection with the peer!
162 int rc = recv_reply_pipe (msg_);
163 if (rc != 0)
164 return rc;
165
166 if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) {
167 // Skip the remaining frames and try the next message
168 while (msg_->flags () & msg_t::more) {
169 rc = recv_reply_pipe (msg_);
170 errno_assert (rc == 0);
171 }
172 continue;
173 }
174
175 _message_begins = false;
176 }
177
178 int rc = recv_reply_pipe (msg_);
179 if (rc != 0)
180 return rc;
181
182 // If the reply is fully received, flip the FSM into request-sending state.
183 if (!(msg_->flags () & msg_t::more)) {
184 _receiving_reply = false;
185 _message_begins = true;
186 }
187
188 return 0;
189}
190
191bool zmq::req_t::xhas_in ()
192{
193 // TODO: Duplicates should be removed here.
194
195 if (!_receiving_reply)
196 return false;
197
198 return dealer_t::xhas_in ();
199}
200
201bool zmq::req_t::xhas_out ()
202{
203 if (_receiving_reply && _strict)
204 return false;
205
206 return dealer_t::xhas_out ();
207}
208
209int zmq::req_t::xsetsockopt (int option_,
210 const void *optval_,
211 size_t optvallen_)
212{
213 bool is_int = (optvallen_ == sizeof (int));
214 int value = 0;
215 if (is_int)
216 memcpy (&value, optval_, sizeof (int));
217
218 switch (option_) {
219 case ZMQ_REQ_CORRELATE:
220 if (is_int && value >= 0) {
221 _request_id_frames_enabled = (value != 0);
222 return 0;
223 }
224 break;
225
226 case ZMQ_REQ_RELAXED:
227 if (is_int && value >= 0) {
228 _strict = (value == 0);
229 return 0;
230 }
231 break;
232
233 default:
234 break;
235 }
236
237 return dealer_t::xsetsockopt (option_, optval_, optvallen_);
238}
239
240void zmq::req_t::xpipe_terminated (pipe_t *pipe_)
241{
242 if (_reply_pipe == pipe_)
243 _reply_pipe = NULL;
244 dealer_t::xpipe_terminated (pipe_);
245}
246
247int zmq::req_t::recv_reply_pipe (msg_t *msg_)
248{
249 while (true) {
250 pipe_t *pipe = NULL;
251 int rc = dealer_t::recvpipe (msg_, &pipe);
252 if (rc != 0)
253 return rc;
254 if (!_reply_pipe || pipe == _reply_pipe)
255 return 0;
256 }
257}
258
259zmq::req_session_t::req_session_t (io_thread_t *io_thread_,
260 bool connect_,
261 socket_base_t *socket_,
262 const options_t &options_,
263 address_t *addr_) :
264 session_base_t (io_thread_, connect_, socket_, options_, addr_),
265 _state (bottom)
266{
267}
268
269zmq::req_session_t::~req_session_t ()
270{
271}
272
273int zmq::req_session_t::push_msg (msg_t *msg_)
274{
275 // Ignore commands, they are processed by the engine and should not
276 // affect the state machine.
277 if (unlikely (msg_->flags () & msg_t::command))
278 return 0;
279
280 switch (_state) {
281 case bottom:
282 if (msg_->flags () == msg_t::more) {
283 // In case option ZMQ_CORRELATE is on, allow request_id to be
284 // transfered as first frame (would be too cumbersome to check
285 // whether the option is actually on or not).
286 if (msg_->size () == sizeof (uint32_t)) {
287 _state = request_id;
288 return session_base_t::push_msg (msg_);
289 }
290 if (msg_->size () == 0) {
291 _state = body;
292 return session_base_t::push_msg (msg_);
293 }
294 }
295 break;
296 case request_id:
297 if (msg_->flags () == msg_t::more && msg_->size () == 0) {
298 _state = body;
299 return session_base_t::push_msg (msg_);
300 }
301 break;
302 case body:
303 if (msg_->flags () == msg_t::more)
304 return session_base_t::push_msg (msg_);
305 if (msg_->flags () == 0) {
306 _state = bottom;
307 return session_base_t::push_msg (msg_);
308 }
309 break;
310 }
311 errno = EFAULT;
312 return -1;
313}
314
315void zmq::req_session_t::reset ()
316{
317 session_base_t::reset ();
318 _state = bottom;
319}
320