1/*
2 Copyright (c) 2007-2018 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include "lb.hpp"
32#include "pipe.hpp"
33#include "err.hpp"
34#include "msg.hpp"
35
36zmq::lb_t::lb_t () : _active (0), _current (0), _more (false), _dropping (false)
37{
38}
39
40zmq::lb_t::~lb_t ()
41{
42 zmq_assert (_pipes.empty ());
43}
44
45void zmq::lb_t::attach (pipe_t *pipe_)
46{
47 _pipes.push_back (pipe_);
48 activated (pipe_);
49}
50
51void zmq::lb_t::pipe_terminated (pipe_t *pipe_)
52{
53 pipes_t::size_type index = _pipes.index (pipe_);
54
55 // If we are in the middle of multipart message and current pipe
56 // have disconnected, we have to drop the remainder of the message.
57 if (index == _current && _more)
58 _dropping = true;
59
60 // Remove the pipe from the list; adjust number of active pipes
61 // accordingly.
62 if (index < _active) {
63 _active--;
64 _pipes.swap (index, _active);
65 if (_current == _active)
66 _current = 0;
67 }
68 _pipes.erase (pipe_);
69}
70
71void zmq::lb_t::activated (pipe_t *pipe_)
72{
73 // Move the pipe to the list of active pipes.
74 _pipes.swap (_pipes.index (pipe_), _active);
75 _active++;
76}
77
78int zmq::lb_t::send (msg_t *msg_)
79{
80 return sendpipe (msg_, NULL);
81}
82
83int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
84{
85 // Drop the message if required. If we are at the end of the message
86 // switch back to non-dropping mode.
87 if (_dropping) {
88 _more = (msg_->flags () & msg_t::more) != 0;
89 _dropping = _more;
90
91 int rc = msg_->close ();
92 errno_assert (rc == 0);
93 rc = msg_->init ();
94 errno_assert (rc == 0);
95 return 0;
96 }
97
98 while (_active > 0) {
99 if (_pipes[_current]->write (msg_)) {
100 if (pipe_)
101 *pipe_ = _pipes[_current];
102 break;
103 }
104
105 // If send fails for multi-part msg rollback other
106 // parts sent earlier and return EAGAIN.
107 // Application should handle this as suitable
108 if (_more) {
109 _pipes[_current]->rollback ();
110 // At this point the pipe is already being deallocated
111 // and the first N frames are unreachable (_outpipe is
112 // most likely already NULL so rollback won't actually do
113 // anything and they can't be un-written to deliver later).
114 // Return EFAULT to socket_base caller to drop current message
115 // and any other subsequent frames to avoid them being
116 // "stuck" and received when a new client reconnects, which
117 // would break atomicity of multi-part messages (in blocking mode
118 // socket_base just tries again and again to send the same message)
119 // Note that given dropping mode returns 0, the user will
120 // never know that the message could not be delivered, but
121 // can't really fix it without breaking backward compatibility.
122 // -2/EAGAIN will make sure socket_base caller does not re-enter
123 // immediately or after a short sleep in blocking mode.
124 _dropping = (msg_->flags () & msg_t::more) != 0;
125 _more = false;
126 errno = EAGAIN;
127 return -2;
128 }
129
130 _active--;
131 if (_current < _active)
132 _pipes.swap (_current, _active);
133 else
134 _current = 0;
135 }
136
137 // If there are no pipes we cannot send the message.
138 if (_active == 0) {
139 errno = EAGAIN;
140 return -1;
141 }
142
143 // If it's final part of the message we can flush it downstream and
144 // continue round-robining (load balance).
145 _more = (msg_->flags () & msg_t::more) != 0;
146 if (!_more) {
147 _pipes[_current]->flush ();
148
149 if (++_current >= _active)
150 _current = 0;
151 }
152
153 // Detach the message from the data buffer.
154 int rc = msg_->init ();
155 errno_assert (rc == 0);
156
157 return 0;
158}
159
160bool zmq::lb_t::has_out ()
161{
162 // If one part of the message was already written we can definitely
163 // write the rest of the message.
164 if (_more)
165 return true;
166
167 while (_active > 0) {
168 // Check whether a pipe has room for another message.
169 if (_pipes[_current]->check_write ())
170 return true;
171
172 // Deactivate the pipe.
173 _active--;
174 _pipes.swap (_current, _active);
175 if (_current == _active)
176 _current = 0;
177 }
178
179 return false;
180}
181