1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include "dist.hpp"
32#include "pipe.hpp"
33#include "err.hpp"
34#include "msg.hpp"
35#include "likely.hpp"
36
37zmq::dist_t::dist_t () :
38 _matching (0),
39 _active (0),
40 _eligible (0),
41 _more (false)
42{
43}
44
45zmq::dist_t::~dist_t ()
46{
47 zmq_assert (_pipes.empty ());
48}
49
50void zmq::dist_t::attach (pipe_t *pipe_)
51{
52 // If we are in the middle of sending a message, we'll add new pipe
53 // into the list of eligible pipes. Otherwise we add it to the list
54 // of active pipes.
55 if (_more) {
56 _pipes.push_back (pipe_);
57 _pipes.swap (_eligible, _pipes.size () - 1);
58 _eligible++;
59 } else {
60 _pipes.push_back (pipe_);
61 _pipes.swap (_active, _pipes.size () - 1);
62 _active++;
63 _eligible++;
64 }
65}
66
67void zmq::dist_t::match (pipe_t *pipe_)
68{
69 // If pipe is already matching do nothing.
70 if (_pipes.index (pipe_) < _matching)
71 return;
72
73 // If the pipe isn't eligible, ignore it.
74 if (_pipes.index (pipe_) >= _eligible)
75 return;
76
77 // Mark the pipe as matching.
78 _pipes.swap (_pipes.index (pipe_), _matching);
79 _matching++;
80}
81
82void zmq::dist_t::reverse_match ()
83{
84 pipes_t::size_type prev_matching = _matching;
85
86 // Reset matching to 0
87 unmatch ();
88
89 // Mark all matching pipes as not matching and vice-versa.
90 // To do this, push all pipes that are eligible but not
91 // matched - i.e. between "matching" and "eligible" -
92 // to the beginning of the queue.
93 for (pipes_t::size_type i = prev_matching; i < _eligible; ++i) {
94 _pipes.swap (i, _matching++);
95 }
96}
97
98void zmq::dist_t::unmatch ()
99{
100 _matching = 0;
101}
102
103void zmq::dist_t::pipe_terminated (pipe_t *pipe_)
104{
105 // Remove the pipe from the list; adjust number of matching, active and/or
106 // eligible pipes accordingly.
107 if (_pipes.index (pipe_) < _matching) {
108 _pipes.swap (_pipes.index (pipe_), _matching - 1);
109 _matching--;
110 }
111 if (_pipes.index (pipe_) < _active) {
112 _pipes.swap (_pipes.index (pipe_), _active - 1);
113 _active--;
114 }
115 if (_pipes.index (pipe_) < _eligible) {
116 _pipes.swap (_pipes.index (pipe_), _eligible - 1);
117 _eligible--;
118 }
119
120 _pipes.erase (pipe_);
121}
122
123void zmq::dist_t::activated (pipe_t *pipe_)
124{
125 // Move the pipe from passive to eligible state.
126 if (_eligible < _pipes.size ()) {
127 _pipes.swap (_pipes.index (pipe_), _eligible);
128 _eligible++;
129 }
130
131 // If there's no message being sent at the moment, move it to
132 // the active state.
133 if (!_more && _active < _pipes.size ()) {
134 _pipes.swap (_eligible - 1, _active);
135 _active++;
136 }
137}
138
139int zmq::dist_t::send_to_all (msg_t *msg_)
140{
141 _matching = _active;
142 return send_to_matching (msg_);
143}
144
145int zmq::dist_t::send_to_matching (msg_t *msg_)
146{
147 // Is this end of a multipart message?
148 bool msg_more = (msg_->flags () & msg_t::more) != 0;
149
150 // Push the message to matching pipes.
151 distribute (msg_);
152
153 // If multipart message is fully sent, activate all the eligible pipes.
154 if (!msg_more)
155 _active = _eligible;
156
157 _more = msg_more;
158
159 return 0;
160}
161
162void zmq::dist_t::distribute (msg_t *msg_)
163{
164 // If there are no matching pipes available, simply drop the message.
165 if (_matching == 0) {
166 int rc = msg_->close ();
167 errno_assert (rc == 0);
168 rc = msg_->init ();
169 errno_assert (rc == 0);
170 return;
171 }
172
173 if (msg_->is_vsm ()) {
174 for (pipes_t::size_type i = 0; i < _matching; ++i)
175 if (!write (_pipes[i], msg_))
176 --i; // Retry last write because index will have been swapped
177 int rc = msg_->close ();
178 errno_assert (rc == 0);
179 rc = msg_->init ();
180 errno_assert (rc == 0);
181 return;
182 }
183
184 // Add matching-1 references to the message. We already hold one reference,
185 // that's why -1.
186 msg_->add_refs (static_cast<int> (_matching) - 1);
187
188 // Push copy of the message to each matching pipe.
189 int failed = 0;
190 for (pipes_t::size_type i = 0; i < _matching; ++i)
191 if (!write (_pipes[i], msg_)) {
192 ++failed;
193 --i; // Retry last write because index will have been swapped
194 }
195 if (unlikely (failed))
196 msg_->rm_refs (failed);
197
198 // Detach the original message from the data buffer. Note that we don't
199 // close the message. That's because we've already used all the references.
200 int rc = msg_->init ();
201 errno_assert (rc == 0);
202}
203
204bool zmq::dist_t::has_out ()
205{
206 return true;
207}
208
209bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
210{
211 if (!pipe_->write (msg_)) {
212 _pipes.swap (_pipes.index (pipe_), _matching - 1);
213 _matching--;
214 _pipes.swap (_pipes.index (pipe_), _active - 1);
215 _active--;
216 _pipes.swap (_active, _eligible - 1);
217 _eligible--;
218 return false;
219 }
220 if (!(msg_->flags () & msg_t::more))
221 pipe_->flush ();
222 return true;
223}
224
225bool zmq::dist_t::check_hwm ()
226{
227 for (pipes_t::size_type i = 0; i < _matching; ++i)
228 if (!_pipes[i]->check_hwm ())
229 return false;
230
231 return true;
232}
233