1 | // |
2 | // SocketReactorTest.cpp |
3 | // |
4 | // Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH. |
5 | // and Contributors. |
6 | // |
7 | // SPDX-License-Identifier: BSL-1.0 |
8 | // |
9 | |
10 | |
11 | #include "SocketReactorTest.h" |
12 | #include "Poco/CppUnit/TestCaller.h" |
13 | #include "Poco/CppUnit/TestSuite.h" |
14 | #include "Poco/Net/SocketReactor.h" |
15 | #include "Poco/Net/SocketNotification.h" |
16 | #include "Poco/Net/SocketConnector.h" |
17 | #include "Poco/Net/SocketAcceptor.h" |
18 | #include "Poco/Net/ParallelSocketAcceptor.h" |
19 | #include "Poco/Net/StreamSocket.h" |
20 | #include "Poco/Net/ServerSocket.h" |
21 | #include "Poco/Net/SocketAddress.h" |
22 | #include "Poco/Observer.h" |
23 | #include "Poco/Exception.h" |
24 | #include "Poco/Thread.h" |
25 | #include <sstream> |
26 | |
27 | |
28 | using Poco::Net::SocketReactor; |
29 | using Poco::Net::SocketConnector; |
30 | using Poco::Net::SocketAcceptor; |
31 | using Poco::Net::ParallelSocketAcceptor; |
32 | using Poco::Net::StreamSocket; |
33 | using Poco::Net::ServerSocket; |
34 | using Poco::Net::SocketAddress; |
35 | using Poco::Net::SocketNotification; |
36 | using Poco::Net::ReadableNotification; |
37 | using Poco::Net::WritableNotification; |
38 | using Poco::Net::TimeoutNotification; |
39 | using Poco::Net::ShutdownNotification; |
40 | using Poco::Observer; |
41 | using Poco::IllegalStateException; |
42 | using Poco::Thread; |
43 | |
44 | |
45 | namespace |
46 | { |
47 | class EchoServiceHandler |
48 | { |
49 | public: |
50 | EchoServiceHandler(const StreamSocket& socket, SocketReactor& reactor): |
51 | _socket(socket), |
52 | _reactor(reactor) |
53 | { |
54 | _reactor.addEventHandler(_socket, Observer<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable)); |
55 | } |
56 | |
57 | ~EchoServiceHandler() |
58 | { |
59 | } |
60 | |
61 | void onReadable(ReadableNotification* pNf) |
62 | { |
63 | pNf->release(); |
64 | char buffer[8]; |
65 | int n = _socket.receiveBytes(buffer, sizeof(buffer)); |
66 | if (n > 0) |
67 | { |
68 | _socket.sendBytes(buffer, n); |
69 | } |
70 | else |
71 | { |
72 | _reactor.removeEventHandler(_socket, Observer<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable)); |
73 | delete this; |
74 | } |
75 | } |
76 | |
77 | private: |
78 | StreamSocket _socket; |
79 | SocketReactor& _reactor; |
80 | }; |
81 | |
82 | class ClientServiceHandler |
83 | { |
84 | public: |
85 | ClientServiceHandler(const StreamSocket& socket, SocketReactor& reactor): |
86 | _socket(socket), |
87 | _reactor(reactor), |
88 | _or(*this, &ClientServiceHandler::onReadable), |
89 | _ow(*this, &ClientServiceHandler::onWritable), |
90 | _ot(*this, &ClientServiceHandler::onTimeout) |
91 | { |
92 | _timeout = false; |
93 | _readableError = false; |
94 | _writableError = false; |
95 | _timeoutError = false; |
96 | checkReadableObserverCount(0); |
97 | _reactor.addEventHandler(_socket, _or); |
98 | checkReadableObserverCount(1); |
99 | checkWritableObserverCount(0); |
100 | _reactor.addEventHandler(_socket, _ow); |
101 | checkWritableObserverCount(1); |
102 | checkTimeoutObserverCount(0); |
103 | _reactor.addEventHandler(_socket, _ot); |
104 | checkTimeoutObserverCount(1); |
105 | } |
106 | |
107 | ~ClientServiceHandler() |
108 | { |
109 | } |
110 | |
111 | void onReadable(ReadableNotification* pNf) |
112 | { |
113 | pNf->release(); |
114 | char buffer[32]; |
115 | int n = _socket.receiveBytes(buffer, sizeof(buffer)); |
116 | if (n > 0) |
117 | { |
118 | _str.write(buffer, n); |
119 | _data += _str.str(); |
120 | _str.str("" ); |
121 | } |
122 | else |
123 | { |
124 | checkReadableObserverCount(1); |
125 | _reactor.removeEventHandler(_socket, Observer<ClientServiceHandler, ReadableNotification>(*this, &ClientServiceHandler::onReadable)); |
126 | checkReadableObserverCount(0); |
127 | if (_once || _data.size() == 8192) |
128 | { |
129 | _reactor.stop(); |
130 | delete this; |
131 | } |
132 | } |
133 | } |
134 | |
135 | void onWritable(WritableNotification* pNf) |
136 | { |
137 | pNf->release(); |
138 | checkWritableObserverCount(1); |
139 | _reactor.removeEventHandler(_socket, Observer<ClientServiceHandler, WritableNotification>(*this, &ClientServiceHandler::onWritable)); |
140 | checkWritableObserverCount(0); |
141 | std::string data(1024, 'x'); |
142 | _socket.sendBytes(data.data(), (int) data.length()); |
143 | _socket.shutdownSend(); |
144 | } |
145 | |
146 | void onTimeout(TimeoutNotification* pNf) |
147 | { |
148 | pNf->release(); |
149 | _timeout = true; |
150 | if (_closeOnTimeout) |
151 | { |
152 | _reactor.stop(); |
153 | delete this; |
154 | } |
155 | } |
156 | |
157 | static std::string data() |
158 | { |
159 | return _data; |
160 | } |
161 | |
162 | static void resetData() |
163 | { |
164 | _data.clear(); |
165 | } |
166 | |
167 | static bool timeout() |
168 | { |
169 | return _timeout; |
170 | } |
171 | |
172 | static bool getCloseOnTimeout() |
173 | { |
174 | return _closeOnTimeout; |
175 | } |
176 | |
177 | static void setCloseOnTimeout(bool flag) |
178 | { |
179 | _closeOnTimeout = flag; |
180 | } |
181 | |
182 | static bool readableError() |
183 | { |
184 | return _readableError; |
185 | } |
186 | |
187 | static bool writableError() |
188 | { |
189 | return _writableError; |
190 | } |
191 | |
192 | static bool timeoutError() |
193 | { |
194 | return _timeoutError; |
195 | } |
196 | |
197 | static void setOnce(bool once = true) |
198 | { |
199 | _once = once; |
200 | } |
201 | |
202 | private: |
203 | void checkReadableObserverCount(std::size_t oro) |
204 | { |
205 | if (((oro == 0) && _reactor.hasEventHandler(_socket, _or)) || |
206 | ((oro > 0) && !_reactor.hasEventHandler(_socket, _or))) |
207 | { |
208 | _readableError = true; |
209 | } |
210 | } |
211 | |
212 | void checkWritableObserverCount(std::size_t ow) |
213 | { |
214 | if (((ow == 0) && _reactor.hasEventHandler(_socket, _ow)) || |
215 | ((ow > 0) && !_reactor.hasEventHandler(_socket, _ow))) |
216 | { |
217 | _writableError = true; |
218 | } |
219 | } |
220 | |
221 | void checkTimeoutObserverCount(std::size_t ot) |
222 | { |
223 | if (((ot == 0) && _reactor.hasEventHandler(_socket, _ot)) || |
224 | ((ot > 0) && !_reactor.hasEventHandler(_socket, _ot))) |
225 | { |
226 | _timeoutError = true; |
227 | } |
228 | } |
229 | |
230 | StreamSocket _socket; |
231 | SocketReactor& _reactor; |
232 | Observer<ClientServiceHandler, ReadableNotification> _or; |
233 | Observer<ClientServiceHandler, WritableNotification> _ow; |
234 | Observer<ClientServiceHandler, TimeoutNotification> _ot; |
235 | std::stringstream _str; |
236 | static std::string _data; |
237 | static bool _readableError; |
238 | static bool _writableError; |
239 | static bool _timeoutError; |
240 | static bool _timeout; |
241 | static bool _closeOnTimeout; |
242 | static bool _once; |
243 | }; |
244 | |
245 | |
246 | std::string ClientServiceHandler::_data; |
247 | bool ClientServiceHandler::_readableError = false; |
248 | bool ClientServiceHandler::_writableError = false; |
249 | bool ClientServiceHandler::_timeoutError = false; |
250 | bool ClientServiceHandler::_timeout = false; |
251 | bool ClientServiceHandler::_closeOnTimeout = false; |
252 | bool ClientServiceHandler::_once = false; |
253 | |
254 | |
255 | class FailConnector: public SocketConnector<ClientServiceHandler> |
256 | { |
257 | public: |
258 | FailConnector(SocketAddress& address, SocketReactor& reactor): |
259 | SocketConnector<ClientServiceHandler>(address, reactor), |
260 | _failed(false), |
261 | _shutdown(false) |
262 | { |
263 | reactor.addEventHandler(socket(), Observer<FailConnector, TimeoutNotification>(*this, &FailConnector::onTimeout)); |
264 | reactor.addEventHandler(socket(), Observer<FailConnector, ShutdownNotification>(*this, &FailConnector::onShutdown)); |
265 | } |
266 | |
267 | void onShutdown(ShutdownNotification* pNf) |
268 | { |
269 | pNf->release(); |
270 | _shutdown = true; |
271 | } |
272 | |
273 | void onTimeout(TimeoutNotification* pNf) |
274 | { |
275 | pNf->release(); |
276 | _failed = true; |
277 | reactor()->stop(); |
278 | } |
279 | |
280 | void onError(int error) |
281 | { |
282 | _failed = true; |
283 | reactor()->stop(); |
284 | } |
285 | |
286 | bool failed() const |
287 | { |
288 | return _failed; |
289 | } |
290 | |
291 | bool shutdown() const |
292 | { |
293 | return _shutdown; |
294 | } |
295 | |
296 | private: |
297 | bool _failed; |
298 | bool _shutdown; |
299 | }; |
300 | |
301 | class DataServiceHandler |
302 | { |
303 | public: |
304 | typedef std::vector<std::string> Data; |
305 | |
306 | DataServiceHandler(StreamSocket& socket, SocketReactor& reactor): |
307 | _socket(socket), |
308 | _reactor(reactor), |
309 | _pos(0) |
310 | { |
311 | _data.resize(1); |
312 | _reactor.addEventHandler(_socket, Observer<DataServiceHandler, ReadableNotification>(*this, &DataServiceHandler::onReadable)); |
313 | _reactor.addEventHandler(_socket, Observer<DataServiceHandler, ShutdownNotification>(*this, &DataServiceHandler::onShutdown)); |
314 | } |
315 | |
316 | ~DataServiceHandler() |
317 | { |
318 | _reactor.removeEventHandler(_socket, Observer<DataServiceHandler, ReadableNotification>(*this, &DataServiceHandler::onReadable)); |
319 | _reactor.removeEventHandler(_socket, Observer<DataServiceHandler, ShutdownNotification>(*this, &DataServiceHandler::onShutdown)); |
320 | } |
321 | |
322 | void onReadable(ReadableNotification* pNf) |
323 | { |
324 | pNf->release(); |
325 | char buffer[64]; |
326 | int n = _socket.receiveBytes(&buffer[0], sizeof(buffer)); |
327 | if (n > 0) |
328 | { |
329 | _data[_pos].append(buffer, n); |
330 | std::size_t pos; |
331 | pos = _data[_pos].find('\n'); |
332 | if(pos != std::string::npos) |
333 | { |
334 | if (pos == _data[_pos].size() - 1) |
335 | { |
336 | _data[_pos].erase(pos, 1); |
337 | _data.push_back(std::string()); |
338 | } |
339 | else |
340 | { |
341 | _data.push_back(_data[_pos].substr(pos + 1)); |
342 | _data[_pos].erase(pos); |
343 | } |
344 | ++_pos; |
345 | } |
346 | } |
347 | else return; |
348 | } |
349 | |
350 | void onShutdown(ShutdownNotification* pNf) |
351 | { |
352 | pNf->release(); |
353 | delete this; |
354 | } |
355 | |
356 | static Data _data; |
357 | |
358 | private: |
359 | StreamSocket _socket; |
360 | SocketReactor& _reactor; |
361 | int _pos; |
362 | }; |
363 | |
364 | DataServiceHandler::Data DataServiceHandler::_data; |
365 | } |
366 | |
367 | |
368 | SocketReactorTest::SocketReactorTest(const std::string& name): CppUnit::TestCase(name) |
369 | { |
370 | } |
371 | |
372 | |
373 | SocketReactorTest::~SocketReactorTest() |
374 | { |
375 | } |
376 | |
377 | |
378 | void SocketReactorTest::testSocketReactor() |
379 | { |
380 | SocketAddress ssa; |
381 | ServerSocket ss(ssa); |
382 | SocketReactor reactor; |
383 | SocketAcceptor<EchoServiceHandler> acceptor(ss, reactor); |
384 | SocketAddress sa("127.0.0.1" , ss.address().port()); |
385 | SocketConnector<ClientServiceHandler> connector(sa, reactor); |
386 | ClientServiceHandler::setOnce(true); |
387 | ClientServiceHandler::resetData(); |
388 | reactor.run(); |
389 | std::string data(ClientServiceHandler::data()); |
390 | assertTrue (data.size() == 1024); |
391 | assertTrue (!ClientServiceHandler::readableError()); |
392 | assertTrue (!ClientServiceHandler::writableError()); |
393 | assertTrue (!ClientServiceHandler::timeoutError()); |
394 | } |
395 | |
396 | |
397 | void SocketReactorTest::testSetSocketReactor() |
398 | { |
399 | SocketAddress ssa; |
400 | ServerSocket ss(ssa); |
401 | SocketReactor reactor; |
402 | SocketAcceptor<EchoServiceHandler> acceptor(ss); |
403 | acceptor.setReactor(reactor); |
404 | SocketAddress sa("127.0.0.1" , ss.address().port()); |
405 | SocketConnector<ClientServiceHandler> connector(sa, reactor); |
406 | ClientServiceHandler::setOnce(true); |
407 | ClientServiceHandler::resetData(); |
408 | reactor.run(); |
409 | std::string data(ClientServiceHandler::data()); |
410 | assertTrue (data.size() == 1024); |
411 | assertTrue (!ClientServiceHandler::readableError()); |
412 | assertTrue (!ClientServiceHandler::writableError()); |
413 | assertTrue (!ClientServiceHandler::timeoutError()); |
414 | } |
415 | |
416 | |
417 | void SocketReactorTest::testParallelSocketReactor() |
418 | { |
419 | SocketAddress ssa; |
420 | ServerSocket ss(ssa); |
421 | SocketReactor reactor; |
422 | ParallelSocketAcceptor<EchoServiceHandler, SocketReactor> acceptor(ss, reactor); |
423 | SocketAddress sa("127.0.0.1" , ss.address().port()); |
424 | SocketConnector<ClientServiceHandler> connector1(sa, reactor); |
425 | SocketConnector<ClientServiceHandler> connector2(sa, reactor); |
426 | SocketConnector<ClientServiceHandler> connector3(sa, reactor); |
427 | SocketConnector<ClientServiceHandler> connector4(sa, reactor); |
428 | SocketConnector<ClientServiceHandler> connector5(sa, reactor); |
429 | SocketConnector<ClientServiceHandler> connector6(sa, reactor); |
430 | SocketConnector<ClientServiceHandler> connector7(sa, reactor); |
431 | SocketConnector<ClientServiceHandler> connector8(sa, reactor); |
432 | ClientServiceHandler::setOnce(false); |
433 | ClientServiceHandler::resetData(); |
434 | reactor.run(); |
435 | std::string data(ClientServiceHandler::data()); |
436 | assertTrue (data.size() == 8192); |
437 | assertTrue (!ClientServiceHandler::readableError()); |
438 | assertTrue (!ClientServiceHandler::writableError()); |
439 | assertTrue (!ClientServiceHandler::timeoutError()); |
440 | } |
441 | |
442 | |
443 | void SocketReactorTest::testSocketConnectorFail() |
444 | { |
445 | SocketReactor reactor; |
446 | reactor.setTimeout(Poco::Timespan(3, 0)); |
447 | SocketAddress sa("192.168.168.192" , 12345); |
448 | FailConnector connector(sa, reactor); |
449 | assertTrue (!connector.failed()); |
450 | assertTrue (!connector.shutdown()); |
451 | reactor.run(); |
452 | assertTrue (connector.failed()); |
453 | assertTrue (connector.shutdown()); |
454 | } |
455 | |
456 | |
457 | void SocketReactorTest::testSocketConnectorTimeout() |
458 | { |
459 | ClientServiceHandler::setCloseOnTimeout(true); |
460 | |
461 | SocketAddress ssa; |
462 | ServerSocket ss(ssa); |
463 | SocketReactor reactor; |
464 | SocketAddress sa("127.0.0.1" , ss.address().port()); |
465 | SocketConnector<ClientServiceHandler> connector(sa, reactor); |
466 | reactor.run(); |
467 | assertTrue (ClientServiceHandler::timeout()); |
468 | } |
469 | |
470 | |
471 | void SocketReactorTest::testDataCollection() |
472 | { |
473 | SocketAddress ssa; |
474 | ServerSocket ss(ssa); |
475 | SocketReactor reactor; |
476 | SocketAcceptor<DataServiceHandler> acceptor(ss, reactor); |
477 | Thread thread; |
478 | thread.start(reactor); |
479 | |
480 | SocketAddress sa("127.0.0.1" , ss.address().port()); |
481 | StreamSocket sock(sa); |
482 | |
483 | std::string data0("{" |
484 | " \"src\":\"127.0.0.1\"," |
485 | " \"id\":\"test0\"," |
486 | " \"ts\":\"1524864651000001\"," |
487 | " \"data\":123" |
488 | "}\n" ); |
489 | sock.sendBytes(data0.data(), static_cast<int>(data0.size())); |
490 | |
491 | std::string data1("{" |
492 | " \"src\":\"127.0.0.1\"," |
493 | " \"id\":\"test1\"," |
494 | " \"ts\":\"1524864651123456\"," |
495 | " \"data\":" |
496 | " [" |
497 | " {" |
498 | " \"tag1\":" |
499 | " [" |
500 | " {\"val1\":123}," |
501 | " {\"val2\":\"abc\"}" |
502 | " ]" |
503 | " }" |
504 | " ]" |
505 | "}\n" ); |
506 | sock.sendBytes(data1.data(), static_cast<int>(data1.size())); |
507 | |
508 | std::string data2 = "{" |
509 | " \"src\":\"127.0.0.1\"," |
510 | " \"id\":\"test2\"," |
511 | " \"ts\":\"1524864652654321\"," |
512 | " \"data\":" |
513 | " [" |
514 | " {" |
515 | " \"tag1\":" |
516 | " [" |
517 | " {" |
518 | " \"val1\":123," |
519 | " \"val2\":\"abc\"," |
520 | " \"val3\":42.123" |
521 | " }," |
522 | " {" |
523 | " \"val1\":987," |
524 | " \"val2\":\"xyz\"," |
525 | " \"val3\":24.321" |
526 | " }" |
527 | " ]," |
528 | " \"tag2\":" |
529 | " [" |
530 | " {" |
531 | " \"val1\":42.123," |
532 | " \"val2\":123," |
533 | " \"val3\":\"abc\"" |
534 | " }," |
535 | " {" |
536 | " \"val1\":24.321," |
537 | " \"val2\":987," |
538 | " \"val3\":\"xyz\"" |
539 | " }" |
540 | " ]" |
541 | " }" |
542 | " ]" |
543 | "}\n" ; |
544 | sock.sendBytes(data2.data(), static_cast<int>(data2.size())); |
545 | Thread::sleep(500); |
546 | reactor.stop(); |
547 | thread.join(); |
548 | |
549 | assertTrue (DataServiceHandler::_data.size() == 4); |
550 | data0.erase(data0.size() - 1); |
551 | assertTrue (DataServiceHandler::_data[0] == data0); |
552 | data1.erase(data1.size() - 1); |
553 | assertTrue (DataServiceHandler::_data[1] == data1); |
554 | data2.erase(data2.size() - 1); |
555 | assertTrue (DataServiceHandler::_data[2] == data2); |
556 | assertTrue (DataServiceHandler::_data[3].empty()); |
557 | } |
558 | |
559 | |
560 | void SocketReactorTest::setUp() |
561 | { |
562 | ClientServiceHandler::setCloseOnTimeout(false); |
563 | } |
564 | |
565 | |
566 | void SocketReactorTest::tearDown() |
567 | { |
568 | } |
569 | |
570 | |
571 | CppUnit::Test* SocketReactorTest::suite() |
572 | { |
573 | CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("SocketReactorTest" ); |
574 | |
575 | CppUnit_addTest(pSuite, SocketReactorTest, testSocketReactor); |
576 | CppUnit_addTest(pSuite, SocketReactorTest, testSetSocketReactor); |
577 | CppUnit_addTest(pSuite, SocketReactorTest, testParallelSocketReactor); |
578 | CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorFail); |
579 | CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorTimeout); |
580 | CppUnit_addTest(pSuite, SocketReactorTest, testDataCollection); |
581 | |
582 | return pSuite; |
583 | } |
584 | |