1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include "pollset.hpp"
32#if defined ZMQ_IOTHREAD_POLLER_USE_POLLSET
33
34#include <stdlib.h>
35#include <string.h>
36#include <unistd.h>
37#include <algorithm>
38#include <new>
39
40#include "macros.hpp"
41#include "err.hpp"
42#include "config.hpp"
43#include "i_poll_events.hpp"
44
45zmq::pollset_t::pollset_t (const zmq::thread_ctx_t &ctx_) :
46 ctx (ctx_),
47 stopping (false)
48{
49 pollset_fd = pollset_create (-1);
50 errno_assert (pollset_fd != -1);
51}
52
53zmq::pollset_t::~pollset_t ()
54{
55 // Wait till the worker thread exits.
56 worker.stop ();
57
58 pollset_destroy (pollset_fd);
59 for (retired_t::iterator it = retired.begin (); it != retired.end (); ++it)
60 LIBZMQ_DELETE (*it);
61}
62
63zmq::pollset_t::handle_t zmq::pollset_t::add_fd (fd_t fd_,
64 i_poll_events *events_)
65{
66 poll_entry_t *pe = new (std::nothrow) poll_entry_t;
67 alloc_assert (pe);
68
69 pe->fd = fd_;
70 pe->flag_pollin = false;
71 pe->flag_pollout = false;
72 pe->events = events_;
73
74 struct poll_ctl pc;
75 pc.fd = fd_;
76 pc.cmd = PS_ADD;
77 pc.events = 0;
78
79 int rc = pollset_ctl (pollset_fd, &pc, 1);
80 errno_assert (rc != -1);
81
82 // Increase the load metric of the thread.
83 adjust_load (1);
84
85 if (fd_ >= fd_table.size ()) {
86 fd_table.resize (fd_ + 1, NULL);
87 }
88 fd_table[fd_] = pe;
89 return pe;
90}
91
92void zmq::pollset_t::rm_fd (handle_t handle_)
93{
94 poll_entry_t *pe = (poll_entry_t *) handle_;
95
96 struct poll_ctl pc;
97 pc.fd = pe->fd;
98 pc.cmd = PS_DELETE;
99 pc.events = 0;
100 pollset_ctl (pollset_fd, &pc, 1);
101
102 fd_table[pe->fd] = NULL;
103
104 pe->fd = retired_fd;
105 retired.push_back (pe);
106
107 // Decrease the load metric of the thread.
108 adjust_load (-1);
109}
110
111void zmq::pollset_t::set_pollin (handle_t handle_)
112{
113 poll_entry_t *pe = (poll_entry_t *) handle_;
114 if (likely (!pe->flag_pollin)) {
115 struct poll_ctl pc;
116 pc.fd = pe->fd;
117 pc.cmd = PS_MOD;
118 pc.events = POLLIN;
119
120 const int rc = pollset_ctl (pollset_fd, &pc, 1);
121 errno_assert (rc != -1);
122
123 pe->flag_pollin = true;
124 }
125}
126
127void zmq::pollset_t::reset_pollin (handle_t handle_)
128{
129 poll_entry_t *pe = (poll_entry_t *) handle_;
130 if (unlikely (!pe->flag_pollin)) {
131 return;
132 }
133
134 struct poll_ctl pc;
135 pc.fd = pe->fd;
136 pc.events = 0;
137
138 pc.cmd = PS_DELETE;
139 int rc = pollset_ctl (pollset_fd, &pc, 1);
140
141 if (pe->flag_pollout) {
142 pc.events = POLLOUT;
143 pc.cmd = PS_MOD;
144 rc = pollset_ctl (pollset_fd, &pc, 1);
145 errno_assert (rc != -1);
146 }
147
148 pe->flag_pollin = false;
149}
150
151void zmq::pollset_t::set_pollout (handle_t handle_)
152{
153 poll_entry_t *pe = (poll_entry_t *) handle_;
154 if (likely (!pe->flag_pollout)) {
155 struct poll_ctl pc;
156 pc.fd = pe->fd;
157 pc.cmd = PS_MOD;
158 pc.events = POLLOUT;
159
160 const int rc = pollset_ctl (pollset_fd, &pc, 1);
161 errno_assert (rc != -1);
162
163 pe->flag_pollout = true;
164 }
165}
166
167void zmq::pollset_t::reset_pollout (handle_t handle_)
168{
169 poll_entry_t *pe = (poll_entry_t *) handle_;
170 if (unlikely (!pe->flag_pollout)) {
171 return;
172 }
173
174 struct poll_ctl pc;
175 pc.fd = pe->fd;
176 pc.events = 0;
177
178 pc.cmd = PS_DELETE;
179 int rc = pollset_ctl (pollset_fd, &pc, 1);
180 errno_assert (rc != -1);
181
182 if (pe->flag_pollin) {
183 pc.cmd = PS_MOD;
184 pc.events = POLLIN;
185 rc = pollset_ctl (pollset_fd, &pc, 1);
186 errno_assert (rc != -1);
187 }
188 pe->flag_pollout = false;
189}
190
191void zmq::pollset_t::start ()
192{
193 ctx.start_thread (worker, worker_routine, this);
194}
195
196void zmq::pollset_t::stop ()
197{
198 stopping = true;
199}
200
201int zmq::pollset_t::max_fds ()
202{
203 return -1;
204}
205
206void zmq::pollset_t::loop ()
207{
208 struct pollfd polldata_array[max_io_events];
209
210 while (!stopping) {
211 // Execute any due timers.
212 int timeout = (int) execute_timers ();
213
214 // Wait for events.
215 int n = pollset_poll (pollset_fd, polldata_array, max_io_events,
216 timeout ? timeout : -1);
217 if (n == -1) {
218 errno_assert (errno == EINTR);
219 continue;
220 }
221
222 for (int i = 0; i < n; i++) {
223 poll_entry_t *pe = fd_table[polldata_array[i].fd];
224 if (!pe)
225 continue;
226
227 if (pe->fd == retired_fd)
228 continue;
229 if (polldata_array[i].revents & (POLLERR | POLLHUP))
230 pe->events->in_event ();
231 if (pe->fd == retired_fd)
232 continue;
233 if (polldata_array[i].revents & POLLOUT)
234 pe->events->out_event ();
235 if (pe->fd == retired_fd)
236 continue;
237 if (polldata_array[i].revents & POLLIN)
238 pe->events->in_event ();
239 }
240
241 // Destroy retired event sources.
242 for (retired_t::iterator it = retired.begin (); it != retired.end ();
243 ++it)
244 LIBZMQ_DELETE (*it);
245 retired.clear ();
246 }
247}
248
249void zmq::pollset_t::worker_routine (void *arg_)
250{
251 ((pollset_t *) arg_)->loop ();
252}
253
254#endif
255