| 1 | // |
| 2 | // SocketReactorTest.cpp |
| 3 | // |
| 4 | // Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH. |
| 5 | // and Contributors. |
| 6 | // |
| 7 | // SPDX-License-Identifier: BSL-1.0 |
| 8 | // |
| 9 | |
| 10 | |
| 11 | #include "SocketReactorTest.h" |
| 12 | #include "Poco/CppUnit/TestCaller.h" |
| 13 | #include "Poco/CppUnit/TestSuite.h" |
| 14 | #include "Poco/Net/SocketReactor.h" |
| 15 | #include "Poco/Net/SocketNotification.h" |
| 16 | #include "Poco/Net/SocketConnector.h" |
| 17 | #include "Poco/Net/SocketAcceptor.h" |
| 18 | #include "Poco/Net/ParallelSocketAcceptor.h" |
| 19 | #include "Poco/Net/StreamSocket.h" |
| 20 | #include "Poco/Net/ServerSocket.h" |
| 21 | #include "Poco/Net/SocketAddress.h" |
| 22 | #include "Poco/Observer.h" |
| 23 | #include "Poco/Exception.h" |
| 24 | #include "Poco/Thread.h" |
| 25 | #include <sstream> |
| 26 | |
| 27 | |
| 28 | using Poco::Net::SocketReactor; |
| 29 | using Poco::Net::SocketConnector; |
| 30 | using Poco::Net::SocketAcceptor; |
| 31 | using Poco::Net::ParallelSocketAcceptor; |
| 32 | using Poco::Net::StreamSocket; |
| 33 | using Poco::Net::ServerSocket; |
| 34 | using Poco::Net::SocketAddress; |
| 35 | using Poco::Net::SocketNotification; |
| 36 | using Poco::Net::ReadableNotification; |
| 37 | using Poco::Net::WritableNotification; |
| 38 | using Poco::Net::TimeoutNotification; |
| 39 | using Poco::Net::ShutdownNotification; |
| 40 | using Poco::Observer; |
| 41 | using Poco::IllegalStateException; |
| 42 | using Poco::Thread; |
| 43 | |
| 44 | |
| 45 | namespace |
| 46 | { |
| 47 | class EchoServiceHandler |
| 48 | { |
| 49 | public: |
| 50 | EchoServiceHandler(const StreamSocket& socket, SocketReactor& reactor): |
| 51 | _socket(socket), |
| 52 | _reactor(reactor) |
| 53 | { |
| 54 | _reactor.addEventHandler(_socket, Observer<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable)); |
| 55 | } |
| 56 | |
| 57 | ~EchoServiceHandler() |
| 58 | { |
| 59 | } |
| 60 | |
| 61 | void onReadable(ReadableNotification* pNf) |
| 62 | { |
| 63 | pNf->release(); |
| 64 | char buffer[8]; |
| 65 | int n = _socket.receiveBytes(buffer, sizeof(buffer)); |
| 66 | if (n > 0) |
| 67 | { |
| 68 | _socket.sendBytes(buffer, n); |
| 69 | } |
| 70 | else |
| 71 | { |
| 72 | _reactor.removeEventHandler(_socket, Observer<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable)); |
| 73 | delete this; |
| 74 | } |
| 75 | } |
| 76 | |
| 77 | private: |
| 78 | StreamSocket _socket; |
| 79 | SocketReactor& _reactor; |
| 80 | }; |
| 81 | |
| 82 | class ClientServiceHandler |
| 83 | { |
| 84 | public: |
| 85 | ClientServiceHandler(const StreamSocket& socket, SocketReactor& reactor): |
| 86 | _socket(socket), |
| 87 | _reactor(reactor), |
| 88 | _or(*this, &ClientServiceHandler::onReadable), |
| 89 | _ow(*this, &ClientServiceHandler::onWritable), |
| 90 | _ot(*this, &ClientServiceHandler::onTimeout) |
| 91 | { |
| 92 | _timeout = false; |
| 93 | _readableError = false; |
| 94 | _writableError = false; |
| 95 | _timeoutError = false; |
| 96 | checkReadableObserverCount(0); |
| 97 | _reactor.addEventHandler(_socket, _or); |
| 98 | checkReadableObserverCount(1); |
| 99 | checkWritableObserverCount(0); |
| 100 | _reactor.addEventHandler(_socket, _ow); |
| 101 | checkWritableObserverCount(1); |
| 102 | checkTimeoutObserverCount(0); |
| 103 | _reactor.addEventHandler(_socket, _ot); |
| 104 | checkTimeoutObserverCount(1); |
| 105 | } |
| 106 | |
| 107 | ~ClientServiceHandler() |
| 108 | { |
| 109 | } |
| 110 | |
| 111 | void onReadable(ReadableNotification* pNf) |
| 112 | { |
| 113 | pNf->release(); |
| 114 | char buffer[32]; |
| 115 | int n = _socket.receiveBytes(buffer, sizeof(buffer)); |
| 116 | if (n > 0) |
| 117 | { |
| 118 | _str.write(buffer, n); |
| 119 | _data += _str.str(); |
| 120 | _str.str("" ); |
| 121 | } |
| 122 | else |
| 123 | { |
| 124 | checkReadableObserverCount(1); |
| 125 | _reactor.removeEventHandler(_socket, Observer<ClientServiceHandler, ReadableNotification>(*this, &ClientServiceHandler::onReadable)); |
| 126 | checkReadableObserverCount(0); |
| 127 | if (_once || _data.size() == 8192) |
| 128 | { |
| 129 | _reactor.stop(); |
| 130 | delete this; |
| 131 | } |
| 132 | } |
| 133 | } |
| 134 | |
| 135 | void onWritable(WritableNotification* pNf) |
| 136 | { |
| 137 | pNf->release(); |
| 138 | checkWritableObserverCount(1); |
| 139 | _reactor.removeEventHandler(_socket, Observer<ClientServiceHandler, WritableNotification>(*this, &ClientServiceHandler::onWritable)); |
| 140 | checkWritableObserverCount(0); |
| 141 | std::string data(1024, 'x'); |
| 142 | _socket.sendBytes(data.data(), (int) data.length()); |
| 143 | _socket.shutdownSend(); |
| 144 | } |
| 145 | |
| 146 | void onTimeout(TimeoutNotification* pNf) |
| 147 | { |
| 148 | pNf->release(); |
| 149 | _timeout = true; |
| 150 | if (_closeOnTimeout) |
| 151 | { |
| 152 | _reactor.stop(); |
| 153 | delete this; |
| 154 | } |
| 155 | } |
| 156 | |
| 157 | static std::string data() |
| 158 | { |
| 159 | return _data; |
| 160 | } |
| 161 | |
| 162 | static void resetData() |
| 163 | { |
| 164 | _data.clear(); |
| 165 | } |
| 166 | |
| 167 | static bool timeout() |
| 168 | { |
| 169 | return _timeout; |
| 170 | } |
| 171 | |
| 172 | static bool getCloseOnTimeout() |
| 173 | { |
| 174 | return _closeOnTimeout; |
| 175 | } |
| 176 | |
| 177 | static void setCloseOnTimeout(bool flag) |
| 178 | { |
| 179 | _closeOnTimeout = flag; |
| 180 | } |
| 181 | |
| 182 | static bool readableError() |
| 183 | { |
| 184 | return _readableError; |
| 185 | } |
| 186 | |
| 187 | static bool writableError() |
| 188 | { |
| 189 | return _writableError; |
| 190 | } |
| 191 | |
| 192 | static bool timeoutError() |
| 193 | { |
| 194 | return _timeoutError; |
| 195 | } |
| 196 | |
| 197 | static void setOnce(bool once = true) |
| 198 | { |
| 199 | _once = once; |
| 200 | } |
| 201 | |
| 202 | private: |
| 203 | void checkReadableObserverCount(std::size_t oro) |
| 204 | { |
| 205 | if (((oro == 0) && _reactor.hasEventHandler(_socket, _or)) || |
| 206 | ((oro > 0) && !_reactor.hasEventHandler(_socket, _or))) |
| 207 | { |
| 208 | _readableError = true; |
| 209 | } |
| 210 | } |
| 211 | |
| 212 | void checkWritableObserverCount(std::size_t ow) |
| 213 | { |
| 214 | if (((ow == 0) && _reactor.hasEventHandler(_socket, _ow)) || |
| 215 | ((ow > 0) && !_reactor.hasEventHandler(_socket, _ow))) |
| 216 | { |
| 217 | _writableError = true; |
| 218 | } |
| 219 | } |
| 220 | |
| 221 | void checkTimeoutObserverCount(std::size_t ot) |
| 222 | { |
| 223 | if (((ot == 0) && _reactor.hasEventHandler(_socket, _ot)) || |
| 224 | ((ot > 0) && !_reactor.hasEventHandler(_socket, _ot))) |
| 225 | { |
| 226 | _timeoutError = true; |
| 227 | } |
| 228 | } |
| 229 | |
| 230 | StreamSocket _socket; |
| 231 | SocketReactor& _reactor; |
| 232 | Observer<ClientServiceHandler, ReadableNotification> _or; |
| 233 | Observer<ClientServiceHandler, WritableNotification> _ow; |
| 234 | Observer<ClientServiceHandler, TimeoutNotification> _ot; |
| 235 | std::stringstream _str; |
| 236 | static std::string _data; |
| 237 | static bool _readableError; |
| 238 | static bool _writableError; |
| 239 | static bool _timeoutError; |
| 240 | static bool _timeout; |
| 241 | static bool _closeOnTimeout; |
| 242 | static bool _once; |
| 243 | }; |
| 244 | |
| 245 | |
| 246 | std::string ClientServiceHandler::_data; |
| 247 | bool ClientServiceHandler::_readableError = false; |
| 248 | bool ClientServiceHandler::_writableError = false; |
| 249 | bool ClientServiceHandler::_timeoutError = false; |
| 250 | bool ClientServiceHandler::_timeout = false; |
| 251 | bool ClientServiceHandler::_closeOnTimeout = false; |
| 252 | bool ClientServiceHandler::_once = false; |
| 253 | |
| 254 | |
| 255 | class FailConnector: public SocketConnector<ClientServiceHandler> |
| 256 | { |
| 257 | public: |
| 258 | FailConnector(SocketAddress& address, SocketReactor& reactor): |
| 259 | SocketConnector<ClientServiceHandler>(address, reactor), |
| 260 | _failed(false), |
| 261 | _shutdown(false) |
| 262 | { |
| 263 | reactor.addEventHandler(socket(), Observer<FailConnector, TimeoutNotification>(*this, &FailConnector::onTimeout)); |
| 264 | reactor.addEventHandler(socket(), Observer<FailConnector, ShutdownNotification>(*this, &FailConnector::onShutdown)); |
| 265 | } |
| 266 | |
| 267 | void onShutdown(ShutdownNotification* pNf) |
| 268 | { |
| 269 | pNf->release(); |
| 270 | _shutdown = true; |
| 271 | } |
| 272 | |
| 273 | void onTimeout(TimeoutNotification* pNf) |
| 274 | { |
| 275 | pNf->release(); |
| 276 | _failed = true; |
| 277 | reactor()->stop(); |
| 278 | } |
| 279 | |
| 280 | void onError(int error) |
| 281 | { |
| 282 | _failed = true; |
| 283 | reactor()->stop(); |
| 284 | } |
| 285 | |
| 286 | bool failed() const |
| 287 | { |
| 288 | return _failed; |
| 289 | } |
| 290 | |
| 291 | bool shutdown() const |
| 292 | { |
| 293 | return _shutdown; |
| 294 | } |
| 295 | |
| 296 | private: |
| 297 | bool _failed; |
| 298 | bool _shutdown; |
| 299 | }; |
| 300 | |
| 301 | class DataServiceHandler |
| 302 | { |
| 303 | public: |
| 304 | typedef std::vector<std::string> Data; |
| 305 | |
| 306 | DataServiceHandler(StreamSocket& socket, SocketReactor& reactor): |
| 307 | _socket(socket), |
| 308 | _reactor(reactor), |
| 309 | _pos(0) |
| 310 | { |
| 311 | _data.resize(1); |
| 312 | _reactor.addEventHandler(_socket, Observer<DataServiceHandler, ReadableNotification>(*this, &DataServiceHandler::onReadable)); |
| 313 | _reactor.addEventHandler(_socket, Observer<DataServiceHandler, ShutdownNotification>(*this, &DataServiceHandler::onShutdown)); |
| 314 | } |
| 315 | |
| 316 | ~DataServiceHandler() |
| 317 | { |
| 318 | _reactor.removeEventHandler(_socket, Observer<DataServiceHandler, ReadableNotification>(*this, &DataServiceHandler::onReadable)); |
| 319 | _reactor.removeEventHandler(_socket, Observer<DataServiceHandler, ShutdownNotification>(*this, &DataServiceHandler::onShutdown)); |
| 320 | } |
| 321 | |
| 322 | void onReadable(ReadableNotification* pNf) |
| 323 | { |
| 324 | pNf->release(); |
| 325 | char buffer[64]; |
| 326 | int n = _socket.receiveBytes(&buffer[0], sizeof(buffer)); |
| 327 | if (n > 0) |
| 328 | { |
| 329 | _data[_pos].append(buffer, n); |
| 330 | std::size_t pos; |
| 331 | pos = _data[_pos].find('\n'); |
| 332 | if(pos != std::string::npos) |
| 333 | { |
| 334 | if (pos == _data[_pos].size() - 1) |
| 335 | { |
| 336 | _data[_pos].erase(pos, 1); |
| 337 | _data.push_back(std::string()); |
| 338 | } |
| 339 | else |
| 340 | { |
| 341 | _data.push_back(_data[_pos].substr(pos + 1)); |
| 342 | _data[_pos].erase(pos); |
| 343 | } |
| 344 | ++_pos; |
| 345 | } |
| 346 | } |
| 347 | else return; |
| 348 | } |
| 349 | |
| 350 | void onShutdown(ShutdownNotification* pNf) |
| 351 | { |
| 352 | pNf->release(); |
| 353 | delete this; |
| 354 | } |
| 355 | |
| 356 | static Data _data; |
| 357 | |
| 358 | private: |
| 359 | StreamSocket _socket; |
| 360 | SocketReactor& _reactor; |
| 361 | int _pos; |
| 362 | }; |
| 363 | |
| 364 | DataServiceHandler::Data DataServiceHandler::_data; |
| 365 | } |
| 366 | |
| 367 | |
| 368 | SocketReactorTest::SocketReactorTest(const std::string& name): CppUnit::TestCase(name) |
| 369 | { |
| 370 | } |
| 371 | |
| 372 | |
| 373 | SocketReactorTest::~SocketReactorTest() |
| 374 | { |
| 375 | } |
| 376 | |
| 377 | |
| 378 | void SocketReactorTest::testSocketReactor() |
| 379 | { |
| 380 | SocketAddress ssa; |
| 381 | ServerSocket ss(ssa); |
| 382 | SocketReactor reactor; |
| 383 | SocketAcceptor<EchoServiceHandler> acceptor(ss, reactor); |
| 384 | SocketAddress sa("127.0.0.1" , ss.address().port()); |
| 385 | SocketConnector<ClientServiceHandler> connector(sa, reactor); |
| 386 | ClientServiceHandler::setOnce(true); |
| 387 | ClientServiceHandler::resetData(); |
| 388 | reactor.run(); |
| 389 | std::string data(ClientServiceHandler::data()); |
| 390 | assertTrue (data.size() == 1024); |
| 391 | assertTrue (!ClientServiceHandler::readableError()); |
| 392 | assertTrue (!ClientServiceHandler::writableError()); |
| 393 | assertTrue (!ClientServiceHandler::timeoutError()); |
| 394 | } |
| 395 | |
| 396 | |
| 397 | void SocketReactorTest::testSetSocketReactor() |
| 398 | { |
| 399 | SocketAddress ssa; |
| 400 | ServerSocket ss(ssa); |
| 401 | SocketReactor reactor; |
| 402 | SocketAcceptor<EchoServiceHandler> acceptor(ss); |
| 403 | acceptor.setReactor(reactor); |
| 404 | SocketAddress sa("127.0.0.1" , ss.address().port()); |
| 405 | SocketConnector<ClientServiceHandler> connector(sa, reactor); |
| 406 | ClientServiceHandler::setOnce(true); |
| 407 | ClientServiceHandler::resetData(); |
| 408 | reactor.run(); |
| 409 | std::string data(ClientServiceHandler::data()); |
| 410 | assertTrue (data.size() == 1024); |
| 411 | assertTrue (!ClientServiceHandler::readableError()); |
| 412 | assertTrue (!ClientServiceHandler::writableError()); |
| 413 | assertTrue (!ClientServiceHandler::timeoutError()); |
| 414 | } |
| 415 | |
| 416 | |
| 417 | void SocketReactorTest::testParallelSocketReactor() |
| 418 | { |
| 419 | SocketAddress ssa; |
| 420 | ServerSocket ss(ssa); |
| 421 | SocketReactor reactor; |
| 422 | ParallelSocketAcceptor<EchoServiceHandler, SocketReactor> acceptor(ss, reactor); |
| 423 | SocketAddress sa("127.0.0.1" , ss.address().port()); |
| 424 | SocketConnector<ClientServiceHandler> connector1(sa, reactor); |
| 425 | SocketConnector<ClientServiceHandler> connector2(sa, reactor); |
| 426 | SocketConnector<ClientServiceHandler> connector3(sa, reactor); |
| 427 | SocketConnector<ClientServiceHandler> connector4(sa, reactor); |
| 428 | SocketConnector<ClientServiceHandler> connector5(sa, reactor); |
| 429 | SocketConnector<ClientServiceHandler> connector6(sa, reactor); |
| 430 | SocketConnector<ClientServiceHandler> connector7(sa, reactor); |
| 431 | SocketConnector<ClientServiceHandler> connector8(sa, reactor); |
| 432 | ClientServiceHandler::setOnce(false); |
| 433 | ClientServiceHandler::resetData(); |
| 434 | reactor.run(); |
| 435 | std::string data(ClientServiceHandler::data()); |
| 436 | assertTrue (data.size() == 8192); |
| 437 | assertTrue (!ClientServiceHandler::readableError()); |
| 438 | assertTrue (!ClientServiceHandler::writableError()); |
| 439 | assertTrue (!ClientServiceHandler::timeoutError()); |
| 440 | } |
| 441 | |
| 442 | |
| 443 | void SocketReactorTest::testSocketConnectorFail() |
| 444 | { |
| 445 | SocketReactor reactor; |
| 446 | reactor.setTimeout(Poco::Timespan(3, 0)); |
| 447 | SocketAddress sa("192.168.168.192" , 12345); |
| 448 | FailConnector connector(sa, reactor); |
| 449 | assertTrue (!connector.failed()); |
| 450 | assertTrue (!connector.shutdown()); |
| 451 | reactor.run(); |
| 452 | assertTrue (connector.failed()); |
| 453 | assertTrue (connector.shutdown()); |
| 454 | } |
| 455 | |
| 456 | |
| 457 | void SocketReactorTest::testSocketConnectorTimeout() |
| 458 | { |
| 459 | ClientServiceHandler::setCloseOnTimeout(true); |
| 460 | |
| 461 | SocketAddress ssa; |
| 462 | ServerSocket ss(ssa); |
| 463 | SocketReactor reactor; |
| 464 | SocketAddress sa("127.0.0.1" , ss.address().port()); |
| 465 | SocketConnector<ClientServiceHandler> connector(sa, reactor); |
| 466 | reactor.run(); |
| 467 | assertTrue (ClientServiceHandler::timeout()); |
| 468 | } |
| 469 | |
| 470 | |
| 471 | void SocketReactorTest::testDataCollection() |
| 472 | { |
| 473 | SocketAddress ssa; |
| 474 | ServerSocket ss(ssa); |
| 475 | SocketReactor reactor; |
| 476 | SocketAcceptor<DataServiceHandler> acceptor(ss, reactor); |
| 477 | Thread thread; |
| 478 | thread.start(reactor); |
| 479 | |
| 480 | SocketAddress sa("127.0.0.1" , ss.address().port()); |
| 481 | StreamSocket sock(sa); |
| 482 | |
| 483 | std::string data0("{" |
| 484 | " \"src\":\"127.0.0.1\"," |
| 485 | " \"id\":\"test0\"," |
| 486 | " \"ts\":\"1524864651000001\"," |
| 487 | " \"data\":123" |
| 488 | "}\n" ); |
| 489 | sock.sendBytes(data0.data(), static_cast<int>(data0.size())); |
| 490 | |
| 491 | std::string data1("{" |
| 492 | " \"src\":\"127.0.0.1\"," |
| 493 | " \"id\":\"test1\"," |
| 494 | " \"ts\":\"1524864651123456\"," |
| 495 | " \"data\":" |
| 496 | " [" |
| 497 | " {" |
| 498 | " \"tag1\":" |
| 499 | " [" |
| 500 | " {\"val1\":123}," |
| 501 | " {\"val2\":\"abc\"}" |
| 502 | " ]" |
| 503 | " }" |
| 504 | " ]" |
| 505 | "}\n" ); |
| 506 | sock.sendBytes(data1.data(), static_cast<int>(data1.size())); |
| 507 | |
| 508 | std::string data2 = "{" |
| 509 | " \"src\":\"127.0.0.1\"," |
| 510 | " \"id\":\"test2\"," |
| 511 | " \"ts\":\"1524864652654321\"," |
| 512 | " \"data\":" |
| 513 | " [" |
| 514 | " {" |
| 515 | " \"tag1\":" |
| 516 | " [" |
| 517 | " {" |
| 518 | " \"val1\":123," |
| 519 | " \"val2\":\"abc\"," |
| 520 | " \"val3\":42.123" |
| 521 | " }," |
| 522 | " {" |
| 523 | " \"val1\":987," |
| 524 | " \"val2\":\"xyz\"," |
| 525 | " \"val3\":24.321" |
| 526 | " }" |
| 527 | " ]," |
| 528 | " \"tag2\":" |
| 529 | " [" |
| 530 | " {" |
| 531 | " \"val1\":42.123," |
| 532 | " \"val2\":123," |
| 533 | " \"val3\":\"abc\"" |
| 534 | " }," |
| 535 | " {" |
| 536 | " \"val1\":24.321," |
| 537 | " \"val2\":987," |
| 538 | " \"val3\":\"xyz\"" |
| 539 | " }" |
| 540 | " ]" |
| 541 | " }" |
| 542 | " ]" |
| 543 | "}\n" ; |
| 544 | sock.sendBytes(data2.data(), static_cast<int>(data2.size())); |
| 545 | Thread::sleep(500); |
| 546 | reactor.stop(); |
| 547 | thread.join(); |
| 548 | |
| 549 | assertTrue (DataServiceHandler::_data.size() == 4); |
| 550 | data0.erase(data0.size() - 1); |
| 551 | assertTrue (DataServiceHandler::_data[0] == data0); |
| 552 | data1.erase(data1.size() - 1); |
| 553 | assertTrue (DataServiceHandler::_data[1] == data1); |
| 554 | data2.erase(data2.size() - 1); |
| 555 | assertTrue (DataServiceHandler::_data[2] == data2); |
| 556 | assertTrue (DataServiceHandler::_data[3].empty()); |
| 557 | } |
| 558 | |
| 559 | |
| 560 | void SocketReactorTest::setUp() |
| 561 | { |
| 562 | ClientServiceHandler::setCloseOnTimeout(false); |
| 563 | } |
| 564 | |
| 565 | |
| 566 | void SocketReactorTest::tearDown() |
| 567 | { |
| 568 | } |
| 569 | |
| 570 | |
| 571 | CppUnit::Test* SocketReactorTest::suite() |
| 572 | { |
| 573 | CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("SocketReactorTest" ); |
| 574 | |
| 575 | CppUnit_addTest(pSuite, SocketReactorTest, testSocketReactor); |
| 576 | CppUnit_addTest(pSuite, SocketReactorTest, testSetSocketReactor); |
| 577 | CppUnit_addTest(pSuite, SocketReactorTest, testParallelSocketReactor); |
| 578 | CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorFail); |
| 579 | CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorTimeout); |
| 580 | CppUnit_addTest(pSuite, SocketReactorTest, testDataCollection); |
| 581 | |
| 582 | return pSuite; |
| 583 | } |
| 584 | |