1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "testutil.hpp"
31#include "testutil_unity.hpp"
32
33#include <string.h>
34
35SETUP_TEARDOWN_TESTCONTEXT
36
37/// Initialize a zeromq message with a given null-terminated string
38#define ZMQ_PREPARE_STRING(msg, data, size) \
39 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg)); \
40 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg, size + 1)); \
41 memcpy (zmq_msg_data (&msg), data, size + 1);
42
43static int publicationsReceived = 0;
44static bool isSubscribed = false;
45
46void test_disconnect_inproc ()
47{
48 void *pub_socket = test_context_socket (ZMQ_XPUB);
49 void *sub_socket = test_context_socket (ZMQ_SUB);
50 TEST_ASSERT_SUCCESS_ERRNO (
51 zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, "foo", 3));
52
53 TEST_ASSERT_SUCCESS_ERRNO (
54 zmq_bind (pub_socket, "inproc://someInProcDescriptor"));
55
56 int more;
57 size_t more_size = sizeof (more);
58
59 for (int iteration = 0;; ++iteration) {
60 zmq_pollitem_t items[] = {
61 {sub_socket, 0, ZMQ_POLLIN, 0}, // read publications
62 {pub_socket, 0, ZMQ_POLLIN, 0}, // read subscriptions
63 };
64 int rc = zmq_poll (items, 2, 100);
65
66 if (items[1].revents & ZMQ_POLLIN) {
67 for (more = 1; more;) {
68 zmq_msg_t msg;
69 zmq_msg_init (&msg);
70 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, pub_socket, 0));
71 char *buffer = static_cast<char *> (zmq_msg_data (&msg));
72
73 if (buffer[0] == 0) {
74 TEST_ASSERT_TRUE (isSubscribed);
75 isSubscribed = false;
76 } else {
77 TEST_ASSERT_FALSE (isSubscribed);
78 isSubscribed = true;
79 }
80
81 TEST_ASSERT_SUCCESS_ERRNO (
82 zmq_getsockopt (pub_socket, ZMQ_RCVMORE, &more, &more_size));
83 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
84 }
85 }
86
87 if (items[0].revents & ZMQ_POLLIN) {
88 more = 1;
89 for (more = 1; more;) {
90 zmq_msg_t msg;
91 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
92 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, sub_socket, 0));
93 TEST_ASSERT_SUCCESS_ERRNO (
94 zmq_getsockopt (sub_socket, ZMQ_RCVMORE, &more, &more_size));
95 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
96 }
97 publicationsReceived++;
98 }
99 if (iteration == 1) {
100 TEST_ASSERT_SUCCESS_ERRNO (
101 zmq_connect (sub_socket, "inproc://someInProcDescriptor"));
102 msleep (SETTLE_TIME);
103 }
104 if (iteration == 4) {
105 TEST_ASSERT_SUCCESS_ERRNO (
106 zmq_disconnect (sub_socket, "inproc://someInProcDescriptor"));
107 }
108 if (iteration > 4 && rc == 0)
109 break;
110
111 zmq_msg_t channel_envlp;
112 ZMQ_PREPARE_STRING (channel_envlp, "foo", 3);
113 TEST_ASSERT_SUCCESS_ERRNO (
114 zmq_msg_send (&channel_envlp, pub_socket, ZMQ_SNDMORE));
115 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&channel_envlp));
116
117 zmq_msg_t message;
118 ZMQ_PREPARE_STRING (message, "this is foo!", 12);
119 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_send (&message, pub_socket, 0));
120 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&message));
121 }
122 TEST_ASSERT_EQUAL_INT (3, publicationsReceived);
123 TEST_ASSERT_FALSE (isSubscribed);
124
125 test_context_socket_close (pub_socket);
126 test_context_socket_close (sub_socket);
127}
128
129int main (int, char **)
130{
131 setup_test_environment ();
132
133 UNITY_BEGIN ();
134 RUN_TEST (test_disconnect_inproc);
135 return UNITY_END ();
136}
137