1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "testutil.hpp"
31#include "testutil_unity.hpp"
32
33#include <stdlib.h>
34
35SETUP_TEARDOWN_TESTCONTEXT
36
37char connect_address[MAX_SOCKET_STRING];
38
39void test_fair_queue_in (const char *bind_address_)
40{
41 void *rep = test_context_socket (ZMQ_REP);
42
43 int timeout = 250;
44 TEST_ASSERT_SUCCESS_ERRNO (
45 zmq_setsockopt (rep, ZMQ_RCVTIMEO, &timeout, sizeof (int)));
46
47 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (rep, bind_address_));
48 size_t len = MAX_SOCKET_STRING;
49 TEST_ASSERT_SUCCESS_ERRNO (
50 zmq_getsockopt (rep, ZMQ_LAST_ENDPOINT, connect_address, &len));
51
52 const size_t services = 5;
53 void *reqs[services];
54 for (size_t peer = 0; peer < services; ++peer) {
55 reqs[peer] = test_context_socket (ZMQ_REQ);
56
57 TEST_ASSERT_SUCCESS_ERRNO (
58 zmq_setsockopt (reqs[peer], ZMQ_RCVTIMEO, &timeout, sizeof (int)));
59 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (reqs[peer], connect_address));
60 }
61
62 msleep (SETTLE_TIME);
63
64 s_send_seq (reqs[0], "A", SEQ_END);
65 s_recv_seq (rep, "A", SEQ_END);
66 s_send_seq (rep, "A", SEQ_END);
67 s_recv_seq (reqs[0], "A", SEQ_END);
68
69 s_send_seq (reqs[0], "A", SEQ_END);
70 s_recv_seq (rep, "A", SEQ_END);
71 s_send_seq (rep, "A", SEQ_END);
72 s_recv_seq (reqs[0], "A", SEQ_END);
73
74 // TODO: following test fails randomly on some boxes
75#ifdef SOMEONE_FIXES_THIS
76 // send N requests
77 for (size_t peer = 0; peer < services; ++peer) {
78 char *str = strdup ("A");
79 str[0] += peer;
80 s_send_seq (reqs[peer], str, SEQ_END);
81 free (str);
82 }
83
84 // handle N requests
85 for (size_t peer = 0; peer < services; ++peer) {
86 char *str = strdup ("A");
87 str[0] += peer;
88 // Test fails here
89 s_recv_seq (rep, str, SEQ_END);
90 s_send_seq (rep, str, SEQ_END);
91 s_recv_seq (reqs[peer], str, SEQ_END);
92 free (str);
93 }
94#endif
95 test_context_socket_close_zero_linger (rep);
96
97 for (size_t peer = 0; peer < services; ++peer)
98 test_context_socket_close_zero_linger (reqs[peer]);
99}
100
101void test_envelope (const char *bind_address_)
102{
103 void *rep = test_context_socket (ZMQ_REP);
104
105 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (rep, bind_address_));
106 size_t len = MAX_SOCKET_STRING;
107 TEST_ASSERT_SUCCESS_ERRNO (
108 zmq_getsockopt (rep, ZMQ_LAST_ENDPOINT, connect_address, &len));
109
110 void *dealer = test_context_socket (ZMQ_DEALER);
111
112 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dealer, connect_address));
113
114 // minimal envelope
115 s_send_seq (dealer, 0, "A", SEQ_END);
116 s_recv_seq (rep, "A", SEQ_END);
117 s_send_seq (rep, "A", SEQ_END);
118 s_recv_seq (dealer, 0, "A", SEQ_END);
119
120 // big envelope
121 s_send_seq (dealer, "X", "Y", 0, "A", SEQ_END);
122 s_recv_seq (rep, "A", SEQ_END);
123 s_send_seq (rep, "A", SEQ_END);
124 s_recv_seq (dealer, "X", "Y", 0, "A", SEQ_END);
125
126 test_context_socket_close_zero_linger (rep);
127 test_context_socket_close_zero_linger (dealer);
128}
129
130const char bind_inproc[] = "inproc://a";
131const char bind_tcp[] = "tcp://127.0.0.1:*";
132
133void test_fair_queue_in_inproc ()
134{
135 test_fair_queue_in (bind_inproc);
136}
137
138void test_fair_queue_in_tcp ()
139{
140 test_fair_queue_in (bind_tcp);
141}
142
143void test_envelope_inproc ()
144{
145 test_envelope (bind_inproc);
146}
147
148void test_envelope_tcp ()
149{
150 test_envelope (bind_tcp);
151}
152
153int main ()
154{
155 setup_test_environment ();
156
157 UNITY_BEGIN ();
158
159 // SHALL receive incoming messages from its peers using a fair-queuing
160 // strategy.
161 RUN_TEST (test_fair_queue_in_inproc);
162 RUN_TEST (test_fair_queue_in_tcp);
163
164 // For an incoming message:
165 // SHALL remove and store the address envelope, including the delimiter.
166 // SHALL pass the remaining data frames to its calling application.
167 // SHALL wait for a single reply message from its calling application.
168 // SHALL prepend the address envelope and delimiter.
169 // SHALL deliver this message back to the originating peer.
170 RUN_TEST (test_envelope_inproc);
171 RUN_TEST (test_envelope_tcp);
172
173 return UNITY_END ();
174}
175