1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include "macros.hpp"
32#include "server.hpp"
33#include "pipe.hpp"
34#include "wire.hpp"
35#include "random.hpp"
36#include "likely.hpp"
37#include "err.hpp"
38
39zmq::server_t::server_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
40 socket_base_t (parent_, tid_, sid_, true),
41 _next_routing_id (generate_random ())
42{
43 options.type = ZMQ_SERVER;
44}
45
46zmq::server_t::~server_t ()
47{
48 zmq_assert (_out_pipes.empty ());
49}
50
51void zmq::server_t::xattach_pipe (pipe_t *pipe_,
52 bool subscribe_to_all_,
53 bool locally_initiated_)
54{
55 LIBZMQ_UNUSED (subscribe_to_all_);
56 LIBZMQ_UNUSED (locally_initiated_);
57
58 zmq_assert (pipe_);
59
60 uint32_t routing_id = _next_routing_id++;
61 if (!routing_id)
62 routing_id = _next_routing_id++; // Never use Routing ID zero
63
64 pipe_->set_server_socket_routing_id (routing_id);
65 // Add the record into output pipes lookup table
66 outpipe_t outpipe = {pipe_, true};
67 bool ok = _out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (routing_id, outpipe).second;
68 zmq_assert (ok);
69
70 _fq.attach (pipe_);
71}
72
73void zmq::server_t::xpipe_terminated (pipe_t *pipe_)
74{
75 out_pipes_t::iterator it =
76 _out_pipes.find (pipe_->get_server_socket_routing_id ());
77 zmq_assert (it != _out_pipes.end ());
78 _out_pipes.erase (it);
79 _fq.pipe_terminated (pipe_);
80}
81
82void zmq::server_t::xread_activated (pipe_t *pipe_)
83{
84 _fq.activated (pipe_);
85}
86
87void zmq::server_t::xwrite_activated (pipe_t *pipe_)
88{
89 const out_pipes_t::iterator end = _out_pipes.end ();
90 out_pipes_t::iterator it;
91 for (it = _out_pipes.begin (); it != end; ++it)
92 if (it->second.pipe == pipe_)
93 break;
94
95 zmq_assert (it != _out_pipes.end ());
96 zmq_assert (!it->second.active);
97 it->second.active = true;
98}
99
100int zmq::server_t::xsend (msg_t *msg_)
101{
102 // SERVER sockets do not allow multipart data (ZMQ_SNDMORE)
103 if (msg_->flags () & msg_t::more) {
104 errno = EINVAL;
105 return -1;
106 }
107 // Find the pipe associated with the routing stored in the message.
108 uint32_t routing_id = msg_->get_routing_id ();
109 out_pipes_t::iterator it = _out_pipes.find (routing_id);
110
111 if (it != _out_pipes.end ()) {
112 if (!it->second.pipe->check_write ()) {
113 it->second.active = false;
114 errno = EAGAIN;
115 return -1;
116 }
117 } else {
118 errno = EHOSTUNREACH;
119 return -1;
120 }
121
122 // Message might be delivered over inproc, so we reset routing id
123 int rc = msg_->reset_routing_id ();
124 errno_assert (rc == 0);
125
126 bool ok = it->second.pipe->write (msg_);
127 if (unlikely (!ok)) {
128 // Message failed to send - we must close it ourselves.
129 rc = msg_->close ();
130 errno_assert (rc == 0);
131 } else
132 it->second.pipe->flush ();
133
134 // Detach the message from the data buffer.
135 rc = msg_->init ();
136 errno_assert (rc == 0);
137
138 return 0;
139}
140
141int zmq::server_t::xrecv (msg_t *msg_)
142{
143 pipe_t *pipe = NULL;
144 int rc = _fq.recvpipe (msg_, &pipe);
145
146 // Drop any messages with more flag
147 while (rc == 0 && msg_->flags () & msg_t::more) {
148 // drop all frames of the current multi-frame message
149 rc = _fq.recvpipe (msg_, NULL);
150
151 while (rc == 0 && msg_->flags () & msg_t::more)
152 rc = _fq.recvpipe (msg_, NULL);
153
154 // get the new message
155 if (rc == 0)
156 rc = _fq.recvpipe (msg_, &pipe);
157 }
158
159 if (rc != 0)
160 return rc;
161
162 zmq_assert (pipe != NULL);
163
164 uint32_t routing_id = pipe->get_server_socket_routing_id ();
165 msg_->set_routing_id (routing_id);
166
167 return 0;
168}
169
170bool zmq::server_t::xhas_in ()
171{
172 return _fq.has_in ();
173}
174
175bool zmq::server_t::xhas_out ()
176{
177 // In theory, SERVER socket is always ready for writing. Whether actual
178 // attempt to write succeeds depends on which pipe the message is going
179 // to be routed to.
180 return true;
181}
182