1/*
2Copyright (c) 2018 Contributors as noted in the AUTHORS file
3
4This file is part of 0MQ.
5
60MQ is free software; you can redistribute it and/or modify it under
7the terms of the GNU Lesser General Public License as published by
8the Free Software Foundation; either version 3 of the License, or
9(at your option) any later version.
10
110MQ is distributed in the hope that it will be useful,
12but WITHOUT ANY WARRANTY; without even the implied warranty of
13MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14GNU Lesser General Public License for more details.
15
16You should have received a copy of the GNU Lesser General Public License
17along with this program. If not, see <http://www.gnu.org/licenses/>.
18*/
19
20#include "../tests/testutil.hpp"
21
22#include <poller.hpp>
23#include <i_poll_events.hpp>
24#include <ip.hpp>
25
26#include <unity.h>
27
28#ifndef _WIN32
29#include <unistd.h>
30#define closesocket close
31#endif
32
33void setUp ()
34{
35}
36void tearDown ()
37{
38}
39
40void test_create ()
41{
42 zmq::thread_ctx_t thread_ctx;
43 zmq::poller_t poller (thread_ctx);
44}
45
46#if 0
47// TODO this triggers an assertion. should it be a valid use case?
48void test_start_empty ()
49{
50 zmq::thread_ctx_t thread_ctx;
51 zmq::poller_t poller (thread_ctx);
52 poller.start ();
53 msleep (SETTLE_TIME);
54}
55#endif
56
57struct test_events_t : zmq::i_poll_events
58{
59 test_events_t (zmq::fd_t fd_, zmq::poller_t &poller_) :
60 _fd (fd_),
61 _poller (poller_)
62 {
63 (void) _fd;
64 }
65
66 virtual void in_event ()
67 {
68 _poller.rm_fd (_handle);
69 _handle = (zmq::poller_t::handle_t) NULL;
70
71 // this must only be incremented after rm_fd
72 in_events.add (1);
73 }
74
75
76 virtual void out_event ()
77 {
78 // TODO
79 }
80
81
82 virtual void timer_event (int id_)
83 {
84 LIBZMQ_UNUSED (id_);
85 _poller.rm_fd (_handle);
86 _handle = (zmq::poller_t::handle_t) NULL;
87
88 // this must only be incremented after rm_fd
89 timer_events.add (1);
90 }
91
92 void set_handle (zmq::poller_t::handle_t handle_) { _handle = handle_; }
93
94 zmq::atomic_counter_t in_events, timer_events;
95
96 private:
97 zmq::fd_t _fd;
98 zmq::poller_t &_poller;
99 zmq::poller_t::handle_t _handle;
100};
101
102void wait_in_events (test_events_t &events_)
103{
104 void *watch = zmq_stopwatch_start ();
105 while (events_.in_events.get () < 1) {
106#ifdef ZMQ_BUILD_DRAFT
107 TEST_ASSERT_LESS_OR_EQUAL_MESSAGE (SETTLE_TIME,
108 zmq_stopwatch_intermediate (watch),
109 "Timeout waiting for in event");
110#endif
111 }
112 zmq_stopwatch_stop (watch);
113}
114
115void wait_timer_events (test_events_t &events_)
116{
117 void *watch = zmq_stopwatch_start ();
118 while (events_.timer_events.get () < 1) {
119#ifdef ZMQ_BUILD_DRAFT
120 TEST_ASSERT_LESS_OR_EQUAL_MESSAGE (SETTLE_TIME,
121 zmq_stopwatch_intermediate (watch),
122 "Timeout waiting for timer event");
123#endif
124 }
125 zmq_stopwatch_stop (watch);
126}
127
128void create_nonblocking_fdpair (zmq::fd_t *r_, zmq::fd_t *w_)
129{
130 int rc = zmq::make_fdpair (r_, w_);
131 TEST_ASSERT_EQUAL_INT (0, rc);
132 TEST_ASSERT_NOT_EQUAL (zmq::retired_fd, *r_);
133 TEST_ASSERT_NOT_EQUAL (zmq::retired_fd, *w_);
134 zmq::unblock_socket (*r_);
135 zmq::unblock_socket (*w_);
136}
137
138void send_signal (zmq::fd_t w_)
139{
140#if defined ZMQ_HAVE_EVENTFD
141 const uint64_t inc = 1;
142 ssize_t sz = write (w_, &inc, sizeof (inc));
143 assert (sz == sizeof (inc));
144#else
145 {
146 char msg[] = "test";
147 int rc = send (w_, msg, sizeof (msg), 0);
148 assert (rc == sizeof (msg));
149 }
150#endif
151}
152
153void close_fdpair (zmq::fd_t w_, zmq::fd_t r_)
154{
155 int rc = closesocket (w_);
156 TEST_ASSERT_EQUAL_INT (0, rc);
157#if !defined ZMQ_HAVE_EVENTFD
158 rc = closesocket (r_);
159 TEST_ASSERT_EQUAL_INT (0, rc);
160#else
161 LIBZMQ_UNUSED (r_);
162#endif
163}
164
165void test_add_fd_and_start_and_receive_data ()
166{
167 zmq::thread_ctx_t thread_ctx;
168 zmq::poller_t poller (thread_ctx);
169
170 zmq::fd_t r, w;
171 create_nonblocking_fdpair (&r, &w);
172
173 test_events_t events (r, poller);
174
175 zmq::poller_t::handle_t handle = poller.add_fd (r, &events);
176 events.set_handle (handle);
177 poller.set_pollin (handle);
178 poller.start ();
179
180 send_signal (w);
181
182 wait_in_events (events);
183
184 // required cleanup
185 close_fdpair (w, r);
186}
187
188void test_add_fd_and_remove_by_timer ()
189{
190 zmq::fd_t r, w;
191 create_nonblocking_fdpair (&r, &w);
192
193 zmq::thread_ctx_t thread_ctx;
194 zmq::poller_t poller (thread_ctx);
195
196 test_events_t events (r, poller);
197
198 zmq::poller_t::handle_t handle = poller.add_fd (r, &events);
199 events.set_handle (handle);
200
201 poller.add_timer (50, &events, 0);
202 poller.start ();
203
204 wait_timer_events (events);
205
206 // required cleanup
207 close_fdpair (w, r);
208}
209
210#ifdef _WIN32
211void test_add_fd_with_pending_failing_connect ()
212{
213 zmq::thread_ctx_t thread_ctx;
214 zmq::poller_t poller (thread_ctx);
215
216 zmq::fd_t bind_socket = socket (AF_INET, SOCK_STREAM, 0);
217 sockaddr_in addr = {0};
218 addr.sin_family = AF_INET;
219 addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
220 addr.sin_port = 0;
221 TEST_ASSERT_EQUAL_INT (0, bind (bind_socket,
222 reinterpret_cast<const sockaddr *> (&addr),
223 sizeof (addr)));
224
225 int addr_len = static_cast<int> (sizeof (addr));
226 TEST_ASSERT_EQUAL_INT (0, getsockname (bind_socket,
227 reinterpret_cast<sockaddr *> (&addr),
228 &addr_len));
229
230 zmq::fd_t connect_socket = socket (AF_INET, SOCK_STREAM, 0);
231 zmq::unblock_socket (connect_socket);
232
233 TEST_ASSERT_EQUAL_INT (
234 -1, connect (connect_socket, reinterpret_cast<const sockaddr *> (&addr),
235 sizeof (addr)));
236 TEST_ASSERT_EQUAL_INT (WSAEWOULDBLOCK, WSAGetLastError ());
237
238 test_events_t events (connect_socket, poller);
239
240 zmq::poller_t::handle_t handle = poller.add_fd (connect_socket, &events);
241 events.set_handle (handle);
242 poller.set_pollin (handle);
243 poller.start ();
244
245 wait_in_events (events);
246
247 int value;
248 int value_len = sizeof (value);
249 TEST_ASSERT_EQUAL_INT (0, getsockopt (connect_socket, SOL_SOCKET, SO_ERROR,
250 reinterpret_cast<char *> (&value),
251 &value_len));
252 TEST_ASSERT_EQUAL_INT (WSAECONNREFUSED, value);
253
254 // required cleanup
255 close (connect_socket);
256 close (bind_socket);
257}
258#endif
259
260int main (void)
261{
262 UNITY_BEGIN ();
263
264 zmq::initialize_network ();
265 setup_test_environment ();
266
267 RUN_TEST (test_create);
268 RUN_TEST (test_add_fd_and_start_and_receive_data);
269 RUN_TEST (test_add_fd_and_remove_by_timer);
270
271#if defined _WIN32
272 RUN_TEST (test_add_fd_with_pending_failing_connect);
273#endif
274
275 zmq::shutdown_network ();
276
277 return UNITY_END ();
278}
279