1//
2// SocketReactorTest.cpp
3//
4// Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH.
5// and Contributors.
6//
7// SPDX-License-Identifier: BSL-1.0
8//
9
10
11#include "SocketReactorTest.h"
12#include "Poco/CppUnit/TestCaller.h"
13#include "Poco/CppUnit/TestSuite.h"
14#include "Poco/Net/SocketReactor.h"
15#include "Poco/Net/SocketNotification.h"
16#include "Poco/Net/SocketConnector.h"
17#include "Poco/Net/SocketAcceptor.h"
18#include "Poco/Net/ParallelSocketAcceptor.h"
19#include "Poco/Net/StreamSocket.h"
20#include "Poco/Net/ServerSocket.h"
21#include "Poco/Net/SocketAddress.h"
22#include "Poco/Observer.h"
23#include "Poco/Exception.h"
24#include "Poco/Thread.h"
25#include <sstream>
26
27
28using Poco::Net::SocketReactor;
29using Poco::Net::SocketConnector;
30using Poco::Net::SocketAcceptor;
31using Poco::Net::ParallelSocketAcceptor;
32using Poco::Net::StreamSocket;
33using Poco::Net::ServerSocket;
34using Poco::Net::SocketAddress;
35using Poco::Net::SocketNotification;
36using Poco::Net::ReadableNotification;
37using Poco::Net::WritableNotification;
38using Poco::Net::TimeoutNotification;
39using Poco::Net::ShutdownNotification;
40using Poco::Observer;
41using Poco::IllegalStateException;
42using Poco::Thread;
43
44
45namespace
46{
47 class EchoServiceHandler
48 {
49 public:
50 EchoServiceHandler(const StreamSocket& socket, SocketReactor& reactor):
51 _socket(socket),
52 _reactor(reactor)
53 {
54 _reactor.addEventHandler(_socket, Observer<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable));
55 }
56
57 ~EchoServiceHandler()
58 {
59 }
60
61 void onReadable(ReadableNotification* pNf)
62 {
63 pNf->release();
64 char buffer[8];
65 int n = _socket.receiveBytes(buffer, sizeof(buffer));
66 if (n > 0)
67 {
68 _socket.sendBytes(buffer, n);
69 }
70 else
71 {
72 _reactor.removeEventHandler(_socket, Observer<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable));
73 delete this;
74 }
75 }
76
77 private:
78 StreamSocket _socket;
79 SocketReactor& _reactor;
80 };
81
82 class ClientServiceHandler
83 {
84 public:
85 ClientServiceHandler(const StreamSocket& socket, SocketReactor& reactor):
86 _socket(socket),
87 _reactor(reactor),
88 _or(*this, &ClientServiceHandler::onReadable),
89 _ow(*this, &ClientServiceHandler::onWritable),
90 _ot(*this, &ClientServiceHandler::onTimeout)
91 {
92 _timeout = false;
93 _readableError = false;
94 _writableError = false;
95 _timeoutError = false;
96 checkReadableObserverCount(0);
97 _reactor.addEventHandler(_socket, _or);
98 checkReadableObserverCount(1);
99 checkWritableObserverCount(0);
100 _reactor.addEventHandler(_socket, _ow);
101 checkWritableObserverCount(1);
102 checkTimeoutObserverCount(0);
103 _reactor.addEventHandler(_socket, _ot);
104 checkTimeoutObserverCount(1);
105 }
106
107 ~ClientServiceHandler()
108 {
109 }
110
111 void onReadable(ReadableNotification* pNf)
112 {
113 pNf->release();
114 char buffer[32];
115 int n = _socket.receiveBytes(buffer, sizeof(buffer));
116 if (n > 0)
117 {
118 _str.write(buffer, n);
119 _data += _str.str();
120 _str.str("");
121 }
122 else
123 {
124 checkReadableObserverCount(1);
125 _reactor.removeEventHandler(_socket, Observer<ClientServiceHandler, ReadableNotification>(*this, &ClientServiceHandler::onReadable));
126 checkReadableObserverCount(0);
127 if (_once || _data.size() == 8192)
128 {
129 _reactor.stop();
130 delete this;
131 }
132 }
133 }
134
135 void onWritable(WritableNotification* pNf)
136 {
137 pNf->release();
138 checkWritableObserverCount(1);
139 _reactor.removeEventHandler(_socket, Observer<ClientServiceHandler, WritableNotification>(*this, &ClientServiceHandler::onWritable));
140 checkWritableObserverCount(0);
141 std::string data(1024, 'x');
142 _socket.sendBytes(data.data(), (int) data.length());
143 _socket.shutdownSend();
144 }
145
146 void onTimeout(TimeoutNotification* pNf)
147 {
148 pNf->release();
149 _timeout = true;
150 if (_closeOnTimeout)
151 {
152 _reactor.stop();
153 delete this;
154 }
155 }
156
157 static std::string data()
158 {
159 return _data;
160 }
161
162 static void resetData()
163 {
164 _data.clear();
165 }
166
167 static bool timeout()
168 {
169 return _timeout;
170 }
171
172 static bool getCloseOnTimeout()
173 {
174 return _closeOnTimeout;
175 }
176
177 static void setCloseOnTimeout(bool flag)
178 {
179 _closeOnTimeout = flag;
180 }
181
182 static bool readableError()
183 {
184 return _readableError;
185 }
186
187 static bool writableError()
188 {
189 return _writableError;
190 }
191
192 static bool timeoutError()
193 {
194 return _timeoutError;
195 }
196
197 static void setOnce(bool once = true)
198 {
199 _once = once;
200 }
201
202 private:
203 void checkReadableObserverCount(std::size_t oro)
204 {
205 if (((oro == 0) && _reactor.hasEventHandler(_socket, _or)) ||
206 ((oro > 0) && !_reactor.hasEventHandler(_socket, _or)))
207 {
208 _readableError = true;
209 }
210 }
211
212 void checkWritableObserverCount(std::size_t ow)
213 {
214 if (((ow == 0) && _reactor.hasEventHandler(_socket, _ow)) ||
215 ((ow > 0) && !_reactor.hasEventHandler(_socket, _ow)))
216 {
217 _writableError = true;
218 }
219 }
220
221 void checkTimeoutObserverCount(std::size_t ot)
222 {
223 if (((ot == 0) && _reactor.hasEventHandler(_socket, _ot)) ||
224 ((ot > 0) && !_reactor.hasEventHandler(_socket, _ot)))
225 {
226 _timeoutError = true;
227 }
228 }
229
230 StreamSocket _socket;
231 SocketReactor& _reactor;
232 Observer<ClientServiceHandler, ReadableNotification> _or;
233 Observer<ClientServiceHandler, WritableNotification> _ow;
234 Observer<ClientServiceHandler, TimeoutNotification> _ot;
235 std::stringstream _str;
236 static std::string _data;
237 static bool _readableError;
238 static bool _writableError;
239 static bool _timeoutError;
240 static bool _timeout;
241 static bool _closeOnTimeout;
242 static bool _once;
243 };
244
245
246 std::string ClientServiceHandler::_data;
247 bool ClientServiceHandler::_readableError = false;
248 bool ClientServiceHandler::_writableError = false;
249 bool ClientServiceHandler::_timeoutError = false;
250 bool ClientServiceHandler::_timeout = false;
251 bool ClientServiceHandler::_closeOnTimeout = false;
252 bool ClientServiceHandler::_once = false;
253
254
255 class FailConnector: public SocketConnector<ClientServiceHandler>
256 {
257 public:
258 FailConnector(SocketAddress& address, SocketReactor& reactor):
259 SocketConnector<ClientServiceHandler>(address, reactor),
260 _failed(false),
261 _shutdown(false)
262 {
263 reactor.addEventHandler(socket(), Observer<FailConnector, TimeoutNotification>(*this, &FailConnector::onTimeout));
264 reactor.addEventHandler(socket(), Observer<FailConnector, ShutdownNotification>(*this, &FailConnector::onShutdown));
265 }
266
267 void onShutdown(ShutdownNotification* pNf)
268 {
269 pNf->release();
270 _shutdown = true;
271 }
272
273 void onTimeout(TimeoutNotification* pNf)
274 {
275 pNf->release();
276 _failed = true;
277 reactor()->stop();
278 }
279
280 void onError(int error)
281 {
282 _failed = true;
283 reactor()->stop();
284 }
285
286 bool failed() const
287 {
288 return _failed;
289 }
290
291 bool shutdown() const
292 {
293 return _shutdown;
294 }
295
296 private:
297 bool _failed;
298 bool _shutdown;
299 };
300
301 class DataServiceHandler
302 {
303 public:
304 typedef std::vector<std::string> Data;
305
306 DataServiceHandler(StreamSocket& socket, SocketReactor& reactor):
307 _socket(socket),
308 _reactor(reactor),
309 _pos(0)
310 {
311 _data.resize(1);
312 _reactor.addEventHandler(_socket, Observer<DataServiceHandler, ReadableNotification>(*this, &DataServiceHandler::onReadable));
313 _reactor.addEventHandler(_socket, Observer<DataServiceHandler, ShutdownNotification>(*this, &DataServiceHandler::onShutdown));
314 }
315
316 ~DataServiceHandler()
317 {
318 _reactor.removeEventHandler(_socket, Observer<DataServiceHandler, ReadableNotification>(*this, &DataServiceHandler::onReadable));
319 _reactor.removeEventHandler(_socket, Observer<DataServiceHandler, ShutdownNotification>(*this, &DataServiceHandler::onShutdown));
320 }
321
322 void onReadable(ReadableNotification* pNf)
323 {
324 pNf->release();
325 char buffer[64];
326 int n = _socket.receiveBytes(&buffer[0], sizeof(buffer));
327 if (n > 0)
328 {
329 _data[_pos].append(buffer, n);
330 std::size_t pos;
331 pos = _data[_pos].find('\n');
332 if(pos != std::string::npos)
333 {
334 if (pos == _data[_pos].size() - 1)
335 {
336 _data[_pos].erase(pos, 1);
337 _data.push_back(std::string());
338 }
339 else
340 {
341 _data.push_back(_data[_pos].substr(pos + 1));
342 _data[_pos].erase(pos);
343 }
344 ++_pos;
345 }
346 }
347 else return;
348 }
349
350 void onShutdown(ShutdownNotification* pNf)
351 {
352 pNf->release();
353 delete this;
354 }
355
356 static Data _data;
357
358 private:
359 StreamSocket _socket;
360 SocketReactor& _reactor;
361 int _pos;
362 };
363
364 DataServiceHandler::Data DataServiceHandler::_data;
365}
366
367
368SocketReactorTest::SocketReactorTest(const std::string& name): CppUnit::TestCase(name)
369{
370}
371
372
373SocketReactorTest::~SocketReactorTest()
374{
375}
376
377
378void SocketReactorTest::testSocketReactor()
379{
380 SocketAddress ssa;
381 ServerSocket ss(ssa);
382 SocketReactor reactor;
383 SocketAcceptor<EchoServiceHandler> acceptor(ss, reactor);
384 SocketAddress sa("127.0.0.1", ss.address().port());
385 SocketConnector<ClientServiceHandler> connector(sa, reactor);
386 ClientServiceHandler::setOnce(true);
387 ClientServiceHandler::resetData();
388 reactor.run();
389 std::string data(ClientServiceHandler::data());
390 assertTrue (data.size() == 1024);
391 assertTrue (!ClientServiceHandler::readableError());
392 assertTrue (!ClientServiceHandler::writableError());
393 assertTrue (!ClientServiceHandler::timeoutError());
394}
395
396
397void SocketReactorTest::testSetSocketReactor()
398{
399 SocketAddress ssa;
400 ServerSocket ss(ssa);
401 SocketReactor reactor;
402 SocketAcceptor<EchoServiceHandler> acceptor(ss);
403 acceptor.setReactor(reactor);
404 SocketAddress sa("127.0.0.1", ss.address().port());
405 SocketConnector<ClientServiceHandler> connector(sa, reactor);
406 ClientServiceHandler::setOnce(true);
407 ClientServiceHandler::resetData();
408 reactor.run();
409 std::string data(ClientServiceHandler::data());
410 assertTrue (data.size() == 1024);
411 assertTrue (!ClientServiceHandler::readableError());
412 assertTrue (!ClientServiceHandler::writableError());
413 assertTrue (!ClientServiceHandler::timeoutError());
414}
415
416
417void SocketReactorTest::testParallelSocketReactor()
418{
419 SocketAddress ssa;
420 ServerSocket ss(ssa);
421 SocketReactor reactor;
422 ParallelSocketAcceptor<EchoServiceHandler, SocketReactor> acceptor(ss, reactor);
423 SocketAddress sa("127.0.0.1", ss.address().port());
424 SocketConnector<ClientServiceHandler> connector1(sa, reactor);
425 SocketConnector<ClientServiceHandler> connector2(sa, reactor);
426 SocketConnector<ClientServiceHandler> connector3(sa, reactor);
427 SocketConnector<ClientServiceHandler> connector4(sa, reactor);
428 SocketConnector<ClientServiceHandler> connector5(sa, reactor);
429 SocketConnector<ClientServiceHandler> connector6(sa, reactor);
430 SocketConnector<ClientServiceHandler> connector7(sa, reactor);
431 SocketConnector<ClientServiceHandler> connector8(sa, reactor);
432 ClientServiceHandler::setOnce(false);
433 ClientServiceHandler::resetData();
434 reactor.run();
435 std::string data(ClientServiceHandler::data());
436 assertTrue (data.size() == 8192);
437 assertTrue (!ClientServiceHandler::readableError());
438 assertTrue (!ClientServiceHandler::writableError());
439 assertTrue (!ClientServiceHandler::timeoutError());
440}
441
442
443void SocketReactorTest::testSocketConnectorFail()
444{
445 SocketReactor reactor;
446 reactor.setTimeout(Poco::Timespan(3, 0));
447 SocketAddress sa("192.168.168.192", 12345);
448 FailConnector connector(sa, reactor);
449 assertTrue (!connector.failed());
450 assertTrue (!connector.shutdown());
451 reactor.run();
452 assertTrue (connector.failed());
453 assertTrue (connector.shutdown());
454}
455
456
457void SocketReactorTest::testSocketConnectorTimeout()
458{
459 ClientServiceHandler::setCloseOnTimeout(true);
460
461 SocketAddress ssa;
462 ServerSocket ss(ssa);
463 SocketReactor reactor;
464 SocketAddress sa("127.0.0.1", ss.address().port());
465 SocketConnector<ClientServiceHandler> connector(sa, reactor);
466 reactor.run();
467 assertTrue (ClientServiceHandler::timeout());
468}
469
470
471void SocketReactorTest::testDataCollection()
472{
473 SocketAddress ssa;
474 ServerSocket ss(ssa);
475 SocketReactor reactor;
476 SocketAcceptor<DataServiceHandler> acceptor(ss, reactor);
477 Thread thread;
478 thread.start(reactor);
479
480 SocketAddress sa("127.0.0.1", ss.address().port());
481 StreamSocket sock(sa);
482
483 std::string data0("{"
484 " \"src\":\"127.0.0.1\","
485 " \"id\":\"test0\","
486 " \"ts\":\"1524864651000001\","
487 " \"data\":123"
488 "}\n");
489 sock.sendBytes(data0.data(), static_cast<int>(data0.size()));
490
491 std::string data1("{"
492 " \"src\":\"127.0.0.1\","
493 " \"id\":\"test1\","
494 " \"ts\":\"1524864651123456\","
495 " \"data\":"
496 " ["
497 " {"
498 " \"tag1\":"
499 " ["
500 " {\"val1\":123},"
501 " {\"val2\":\"abc\"}"
502 " ]"
503 " }"
504 " ]"
505 "}\n");
506 sock.sendBytes(data1.data(), static_cast<int>(data1.size()));
507
508 std::string data2 = "{"
509 " \"src\":\"127.0.0.1\","
510 " \"id\":\"test2\","
511 " \"ts\":\"1524864652654321\","
512 " \"data\":"
513 " ["
514 " {"
515 " \"tag1\":"
516 " ["
517 " {"
518 " \"val1\":123,"
519 " \"val2\":\"abc\","
520 " \"val3\":42.123"
521 " },"
522 " {"
523 " \"val1\":987,"
524 " \"val2\":\"xyz\","
525 " \"val3\":24.321"
526 " }"
527 " ],"
528 " \"tag2\":"
529 " ["
530 " {"
531 " \"val1\":42.123,"
532 " \"val2\":123,"
533 " \"val3\":\"abc\""
534 " },"
535 " {"
536 " \"val1\":24.321,"
537 " \"val2\":987,"
538 " \"val3\":\"xyz\""
539 " }"
540 " ]"
541 " }"
542 " ]"
543 "}\n";
544 sock.sendBytes(data2.data(), static_cast<int>(data2.size()));
545 Thread::sleep(500);
546 reactor.stop();
547 thread.join();
548
549 assertTrue (DataServiceHandler::_data.size() == 4);
550 data0.erase(data0.size() - 1);
551 assertTrue (DataServiceHandler::_data[0] == data0);
552 data1.erase(data1.size() - 1);
553 assertTrue (DataServiceHandler::_data[1] == data1);
554 data2.erase(data2.size() - 1);
555 assertTrue (DataServiceHandler::_data[2] == data2);
556 assertTrue (DataServiceHandler::_data[3].empty());
557}
558
559
560void SocketReactorTest::setUp()
561{
562 ClientServiceHandler::setCloseOnTimeout(false);
563}
564
565
566void SocketReactorTest::tearDown()
567{
568}
569
570
571CppUnit::Test* SocketReactorTest::suite()
572{
573 CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("SocketReactorTest");
574
575 CppUnit_addTest(pSuite, SocketReactorTest, testSocketReactor);
576 CppUnit_addTest(pSuite, SocketReactorTest, testSetSocketReactor);
577 CppUnit_addTest(pSuite, SocketReactorTest, testParallelSocketReactor);
578 CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorFail);
579 CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorTimeout);
580 CppUnit_addTest(pSuite, SocketReactorTest, testDataCollection);
581
582 return pSuite;
583}
584