1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include "macros.hpp"
32
33#include <limits.h>
34#include <string.h>
35
36#ifndef ZMQ_HAVE_WINDOWS
37#include <unistd.h>
38#endif
39
40#include <new>
41#include <sstream>
42
43#include "stream_engine_base.hpp"
44#include "io_thread.hpp"
45#include "session_base.hpp"
46#include "v1_encoder.hpp"
47#include "v1_decoder.hpp"
48#include "v2_encoder.hpp"
49#include "v2_decoder.hpp"
50#include "null_mechanism.hpp"
51#include "plain_client.hpp"
52#include "plain_server.hpp"
53#include "gssapi_client.hpp"
54#include "gssapi_server.hpp"
55#include "curve_client.hpp"
56#include "curve_server.hpp"
57#include "raw_decoder.hpp"
58#include "raw_encoder.hpp"
59#include "config.hpp"
60#include "err.hpp"
61#include "ip.hpp"
62#include "tcp.hpp"
63#include "likely.hpp"
64#include "wire.hpp"
65
66static std::string get_peer_address (zmq::fd_t s_)
67{
68 std::string peer_address;
69
70 const int family = zmq::get_peer_ip_address (s_, peer_address);
71 if (family == 0)
72 peer_address.clear ();
73#if defined ZMQ_HAVE_SO_PEERCRED
74 else if (family == PF_UNIX) {
75 struct ucred cred;
76 socklen_t size = sizeof (cred);
77 if (!getsockopt (s_, SOL_SOCKET, SO_PEERCRED, &cred, &size)) {
78 std::ostringstream buf;
79 buf << ":" << cred.uid << ":" << cred.gid << ":" << cred.pid;
80 peer_address += buf.str ();
81 }
82 }
83#elif defined ZMQ_HAVE_LOCAL_PEERCRED
84 else if (family == PF_UNIX) {
85 struct xucred cred;
86 socklen_t size = sizeof (cred);
87 if (!getsockopt (_s, 0, LOCAL_PEERCRED, &cred, &size)
88 && cred.cr_version == XUCRED_VERSION) {
89 std::ostringstream buf;
90 buf << ":" << cred.cr_uid << ":";
91 if (cred.cr_ngroups > 0)
92 buf << cred.cr_groups[0];
93 buf << ":";
94 _peer_address += buf.str ();
95 }
96 }
97#endif
98
99 return peer_address;
100}
101
102zmq::stream_engine_base_t::stream_engine_base_t (
103 fd_t fd_,
104 const options_t &options_,
105 const endpoint_uri_pair_t &endpoint_uri_pair_) :
106 _options (options_),
107 _inpos (NULL),
108 _insize (0),
109 _decoder (NULL),
110 _outpos (NULL),
111 _outsize (0),
112 _encoder (NULL),
113 _mechanism (NULL),
114 _next_msg (NULL),
115 _process_msg (NULL),
116 _metadata (NULL),
117 _input_stopped (false),
118 _output_stopped (false),
119 _endpoint_uri_pair (endpoint_uri_pair_),
120 _has_handshake_timer (false),
121 _has_ttl_timer (false),
122 _has_timeout_timer (false),
123 _has_heartbeat_timer (false),
124 _peer_address (get_peer_address (fd_)),
125 _s (fd_),
126 _handle (static_cast<handle_t> (NULL)),
127 _plugged (false),
128 _handshaking (true),
129 _io_error (false),
130 _session (NULL),
131 _socket (NULL)
132{
133 int rc = _tx_msg.init ();
134 errno_assert (rc == 0);
135
136 // Put the socket into non-blocking mode.
137 unblock_socket (_s);
138}
139
140zmq::stream_engine_base_t::~stream_engine_base_t ()
141{
142 zmq_assert (!_plugged);
143
144 if (_s != retired_fd) {
145#ifdef ZMQ_HAVE_WINDOWS
146 int rc = closesocket (_s);
147 wsa_assert (rc != SOCKET_ERROR);
148#else
149 int rc = close (_s);
150#if defined(__FreeBSD_kernel__) || defined(__FreeBSD__)
151 // FreeBSD may return ECONNRESET on close() under load but this is not
152 // an error.
153 if (rc == -1 && errno == ECONNRESET)
154 rc = 0;
155#endif
156 errno_assert (rc == 0);
157#endif
158 _s = retired_fd;
159 }
160
161 int rc = _tx_msg.close ();
162 errno_assert (rc == 0);
163
164 // Drop reference to metadata and destroy it if we are
165 // the only user.
166 if (_metadata != NULL) {
167 if (_metadata->drop_ref ()) {
168 LIBZMQ_DELETE (_metadata);
169 }
170 }
171
172 LIBZMQ_DELETE (_encoder);
173 LIBZMQ_DELETE (_decoder);
174 LIBZMQ_DELETE (_mechanism);
175}
176
177void zmq::stream_engine_base_t::plug (io_thread_t *io_thread_,
178 session_base_t *session_)
179{
180 zmq_assert (!_plugged);
181 _plugged = true;
182
183 // Connect to session object.
184 zmq_assert (!_session);
185 zmq_assert (session_);
186 _session = session_;
187 _socket = _session->get_socket ();
188
189 // Connect to I/O threads poller object.
190 io_object_t::plug (io_thread_);
191 _handle = add_fd (_s);
192 _io_error = false;
193
194 plug_internal ();
195}
196
197void zmq::stream_engine_base_t::unplug ()
198{
199 zmq_assert (_plugged);
200 _plugged = false;
201
202 // Cancel all timers.
203 if (_has_handshake_timer) {
204 cancel_timer (handshake_timer_id);
205 _has_handshake_timer = false;
206 }
207
208 if (_has_ttl_timer) {
209 cancel_timer (heartbeat_ttl_timer_id);
210 _has_ttl_timer = false;
211 }
212
213 if (_has_timeout_timer) {
214 cancel_timer (heartbeat_timeout_timer_id);
215 _has_timeout_timer = false;
216 }
217
218 if (_has_heartbeat_timer) {
219 cancel_timer (heartbeat_ivl_timer_id);
220 _has_heartbeat_timer = false;
221 }
222 // Cancel all fd subscriptions.
223 if (!_io_error)
224 rm_fd (_handle);
225
226 // Disconnect from I/O threads poller object.
227 io_object_t::unplug ();
228
229 _session = NULL;
230}
231
232void zmq::stream_engine_base_t::terminate ()
233{
234 unplug ();
235 delete this;
236}
237
238void zmq::stream_engine_base_t::in_event ()
239{
240 // ignore errors
241 const bool res = in_event_internal ();
242 LIBZMQ_UNUSED (res);
243}
244
245bool zmq::stream_engine_base_t::in_event_internal ()
246{
247 zmq_assert (!_io_error);
248
249 // If still handshaking, receive and process the greeting message.
250 if (unlikely (_handshaking)) {
251 if (handshake ()) {
252 // Handshaking was successful.
253 // Switch into the normal message flow.
254 _handshaking = false;
255 } else
256 return false;
257 }
258
259
260 zmq_assert (_decoder);
261
262 // If there has been an I/O error, stop polling.
263 if (_input_stopped) {
264 rm_fd (_handle);
265 _io_error = true;
266 return true; // TODO or return false in this case too?
267 }
268
269 // If there's no data to process in the buffer...
270 if (!_insize) {
271 // Retrieve the buffer and read as much data as possible.
272 // Note that buffer can be arbitrarily large. However, we assume
273 // the underlying TCP layer has fixed buffer size and thus the
274 // number of bytes read will be always limited.
275 size_t bufsize = 0;
276 _decoder->get_buffer (&_inpos, &bufsize);
277
278 int rc = read (_inpos, bufsize);
279
280 if (rc == -1) {
281 if (errno != EAGAIN) {
282 error (connection_error);
283 return false;
284 }
285 return true;
286 }
287
288 // Adjust input size
289 _insize = static_cast<size_t> (rc);
290 // Adjust buffer size to received bytes
291 _decoder->resize_buffer (_insize);
292 }
293
294 int rc = 0;
295 size_t processed = 0;
296
297 while (_insize > 0) {
298 rc = _decoder->decode (_inpos, _insize, processed);
299 zmq_assert (processed <= _insize);
300 _inpos += processed;
301 _insize -= processed;
302 if (rc == 0 || rc == -1)
303 break;
304 rc = (this->*_process_msg) (_decoder->msg ());
305 if (rc == -1)
306 break;
307 }
308
309 // Tear down the connection if we have failed to decode input data
310 // or the session has rejected the message.
311 if (rc == -1) {
312 if (errno != EAGAIN) {
313 error (protocol_error);
314 return false;
315 }
316 _input_stopped = true;
317 reset_pollin (_handle);
318 }
319
320 _session->flush ();
321 return true;
322}
323
324void zmq::stream_engine_base_t::out_event ()
325{
326 zmq_assert (!_io_error);
327
328 // If write buffer is empty, try to read new data from the encoder.
329 if (!_outsize) {
330 // Even when we stop polling as soon as there is no
331 // data to send, the poller may invoke out_event one
332 // more time due to 'speculative write' optimisation.
333 if (unlikely (_encoder == NULL)) {
334 zmq_assert (_handshaking);
335 return;
336 }
337
338 _outpos = NULL;
339 _outsize = _encoder->encode (&_outpos, 0);
340
341 while (_outsize < static_cast<size_t> (_options.out_batch_size)) {
342 if ((this->*_next_msg) (&_tx_msg) == -1)
343 break;
344 _encoder->load_msg (&_tx_msg);
345 unsigned char *bufptr = _outpos + _outsize;
346 size_t n =
347 _encoder->encode (&bufptr, _options.out_batch_size - _outsize);
348 zmq_assert (n > 0);
349 if (_outpos == NULL)
350 _outpos = bufptr;
351 _outsize += n;
352 }
353
354 // If there is no data to send, stop polling for output.
355 if (_outsize == 0) {
356 _output_stopped = true;
357 reset_pollout ();
358 return;
359 }
360 }
361
362 // If there are any data to write in write buffer, write as much as
363 // possible to the socket. Note that amount of data to write can be
364 // arbitrarily large. However, we assume that underlying TCP layer has
365 // limited transmission buffer and thus the actual number of bytes
366 // written should be reasonably modest.
367 const int nbytes = write (_outpos, _outsize);
368
369 // IO error has occurred. We stop waiting for output events.
370 // The engine is not terminated until we detect input error;
371 // this is necessary to prevent losing incoming messages.
372 if (nbytes == -1) {
373 reset_pollout ();
374 return;
375 }
376
377 _outpos += nbytes;
378 _outsize -= nbytes;
379
380 // If we are still handshaking and there are no data
381 // to send, stop polling for output.
382 if (unlikely (_handshaking))
383 if (_outsize == 0)
384 reset_pollout ();
385}
386
387void zmq::stream_engine_base_t::restart_output ()
388{
389 if (unlikely (_io_error))
390 return;
391
392 if (likely (_output_stopped)) {
393 set_pollout ();
394 _output_stopped = false;
395 }
396
397 // Speculative write: The assumption is that at the moment new message
398 // was sent by the user the socket is probably available for writing.
399 // Thus we try to write the data to socket avoiding polling for POLLOUT.
400 // Consequently, the latency should be better in request/reply scenarios.
401 out_event ();
402}
403
404bool zmq::stream_engine_base_t::restart_input ()
405{
406 zmq_assert (_input_stopped);
407 zmq_assert (_session != NULL);
408 zmq_assert (_decoder != NULL);
409
410 int rc = (this->*_process_msg) (_decoder->msg ());
411 if (rc == -1) {
412 if (errno == EAGAIN)
413 _session->flush ();
414 else {
415 error (protocol_error);
416 return false;
417 }
418 return true;
419 }
420
421 while (_insize > 0) {
422 size_t processed = 0;
423 rc = _decoder->decode (_inpos, _insize, processed);
424 zmq_assert (processed <= _insize);
425 _inpos += processed;
426 _insize -= processed;
427 if (rc == 0 || rc == -1)
428 break;
429 rc = (this->*_process_msg) (_decoder->msg ());
430 if (rc == -1)
431 break;
432 }
433
434 if (rc == -1 && errno == EAGAIN)
435 _session->flush ();
436 else if (_io_error) {
437 error (connection_error);
438 return false;
439 } else if (rc == -1) {
440 error (protocol_error);
441 return false;
442 }
443
444 else {
445 _input_stopped = false;
446 set_pollin ();
447 _session->flush ();
448
449 // Speculative read.
450 if (!in_event_internal ())
451 return false;
452 }
453
454 return true;
455}
456
457int zmq::stream_engine_base_t::next_handshake_command (msg_t *msg_)
458{
459 zmq_assert (_mechanism != NULL);
460
461 if (_mechanism->status () == mechanism_t::ready) {
462 mechanism_ready ();
463 return pull_and_encode (msg_);
464 }
465 if (_mechanism->status () == mechanism_t::error) {
466 errno = EPROTO;
467 return -1;
468 }
469 const int rc = _mechanism->next_handshake_command (msg_);
470
471 if (rc == 0)
472 msg_->set_flags (msg_t::command);
473
474 return rc;
475}
476
477int zmq::stream_engine_base_t::process_handshake_command (msg_t *msg_)
478{
479 zmq_assert (_mechanism != NULL);
480 const int rc = _mechanism->process_handshake_command (msg_);
481 if (rc == 0) {
482 if (_mechanism->status () == mechanism_t::ready)
483 mechanism_ready ();
484 else if (_mechanism->status () == mechanism_t::error) {
485 errno = EPROTO;
486 return -1;
487 }
488 if (_output_stopped)
489 restart_output ();
490 }
491
492 return rc;
493}
494
495void zmq::stream_engine_base_t::zap_msg_available ()
496{
497 zmq_assert (_mechanism != NULL);
498
499 const int rc = _mechanism->zap_msg_available ();
500 if (rc == -1) {
501 error (protocol_error);
502 return;
503 }
504 if (_input_stopped)
505 if (!restart_input ())
506 return;
507 if (_output_stopped)
508 restart_output ();
509}
510
511const zmq::endpoint_uri_pair_t &zmq::stream_engine_base_t::get_endpoint () const
512{
513 return _endpoint_uri_pair;
514}
515
516void zmq::stream_engine_base_t::mechanism_ready ()
517{
518 if (_options.heartbeat_interval > 0 && !_has_heartbeat_timer) {
519 add_timer (_options.heartbeat_interval, heartbeat_ivl_timer_id);
520 _has_heartbeat_timer = true;
521 }
522
523 bool flush_session = false;
524
525 if (_options.recv_routing_id) {
526 msg_t routing_id;
527 _mechanism->peer_routing_id (&routing_id);
528 const int rc = _session->push_msg (&routing_id);
529 if (rc == -1 && errno == EAGAIN) {
530 // If the write is failing at this stage with
531 // an EAGAIN the pipe must be being shut down,
532 // so we can just bail out of the routing id set.
533 return;
534 }
535 errno_assert (rc == 0);
536 flush_session = true;
537 }
538
539 if (_options.router_notify & ZMQ_NOTIFY_CONNECT) {
540 msg_t connect_notification;
541 connect_notification.init ();
542 const int rc = _session->push_msg (&connect_notification);
543 if (rc == -1 && errno == EAGAIN) {
544 // If the write is failing at this stage with
545 // an EAGAIN the pipe must be being shut down,
546 // so we can just bail out of the notification.
547 return;
548 }
549 errno_assert (rc == 0);
550 flush_session = true;
551 }
552
553 if (flush_session)
554 _session->flush ();
555
556 _next_msg = &stream_engine_base_t::pull_and_encode;
557 _process_msg = &stream_engine_base_t::write_credential;
558
559 // Compile metadata.
560 properties_t properties;
561 init_properties (properties);
562
563 // Add ZAP properties.
564 const properties_t &zap_properties = _mechanism->get_zap_properties ();
565 properties.insert (zap_properties.begin (), zap_properties.end ());
566
567 // Add ZMTP properties.
568 const properties_t &zmtp_properties = _mechanism->get_zmtp_properties ();
569 properties.insert (zmtp_properties.begin (), zmtp_properties.end ());
570
571 zmq_assert (_metadata == NULL);
572 if (!properties.empty ()) {
573 _metadata = new (std::nothrow) metadata_t (properties);
574 alloc_assert (_metadata);
575 }
576
577 _socket->event_handshake_succeeded (_endpoint_uri_pair, 0);
578}
579
580int zmq::stream_engine_base_t::write_credential (msg_t *msg_)
581{
582 zmq_assert (_mechanism != NULL);
583 zmq_assert (_session != NULL);
584
585 const blob_t &credential = _mechanism->get_user_id ();
586 if (credential.size () > 0) {
587 msg_t msg;
588 int rc = msg.init_size (credential.size ());
589 zmq_assert (rc == 0);
590 memcpy (msg.data (), credential.data (), credential.size ());
591 msg.set_flags (msg_t::credential);
592 rc = _session->push_msg (&msg);
593 if (rc == -1) {
594 rc = msg.close ();
595 errno_assert (rc == 0);
596 return -1;
597 }
598 }
599 _process_msg = &stream_engine_base_t::decode_and_push;
600 return decode_and_push (msg_);
601}
602
603int zmq::stream_engine_base_t::pull_and_encode (msg_t *msg_)
604{
605 zmq_assert (_mechanism != NULL);
606
607 if (_session->pull_msg (msg_) == -1)
608 return -1;
609 if (_mechanism->encode (msg_) == -1)
610 return -1;
611 return 0;
612}
613
614int zmq::stream_engine_base_t::decode_and_push (msg_t *msg_)
615{
616 zmq_assert (_mechanism != NULL);
617
618 if (_mechanism->decode (msg_) == -1)
619 return -1;
620
621 if (_has_timeout_timer) {
622 _has_timeout_timer = false;
623 cancel_timer (heartbeat_timeout_timer_id);
624 }
625
626 if (_has_ttl_timer) {
627 _has_ttl_timer = false;
628 cancel_timer (heartbeat_ttl_timer_id);
629 }
630
631 if (msg_->flags () & msg_t::command) {
632 process_command_message (msg_);
633 }
634
635 if (_metadata)
636 msg_->set_metadata (_metadata);
637 if (_session->push_msg (msg_) == -1) {
638 if (errno == EAGAIN)
639 _process_msg = &stream_engine_base_t::push_one_then_decode_and_push;
640 return -1;
641 }
642 return 0;
643}
644
645int zmq::stream_engine_base_t::push_one_then_decode_and_push (msg_t *msg_)
646{
647 const int rc = _session->push_msg (msg_);
648 if (rc == 0)
649 _process_msg = &stream_engine_base_t::decode_and_push;
650 return rc;
651}
652
653int zmq::stream_engine_base_t::pull_msg_from_session (msg_t *msg_)
654{
655 return _session->pull_msg (msg_);
656}
657
658int zmq::stream_engine_base_t::push_msg_to_session (msg_t *msg_)
659{
660 return _session->push_msg (msg_);
661}
662
663void zmq::stream_engine_base_t::error (error_reason_t reason_)
664{
665 zmq_assert (_session);
666
667 if ((_options.router_notify & ZMQ_NOTIFY_DISCONNECT) && !_handshaking) {
668 // For router sockets with disconnect notification, rollback
669 // any incomplete message in the pipe, and push the disconnect
670 // notification message.
671 _session->rollback ();
672
673 msg_t disconnect_notification;
674 disconnect_notification.init ();
675 _session->push_msg (&disconnect_notification);
676 }
677
678 // protocol errors have been signaled already at the point where they occurred
679 if (reason_ != protocol_error
680 && (_mechanism == NULL
681 || _mechanism->status () == mechanism_t::handshaking)) {
682 int err = errno;
683 _socket->event_handshake_failed_no_detail (_endpoint_uri_pair, err);
684 }
685
686 _socket->event_disconnected (_endpoint_uri_pair, _s);
687 _session->flush ();
688 _session->engine_error (reason_);
689 unplug ();
690 delete this;
691}
692
693void zmq::stream_engine_base_t::set_handshake_timer ()
694{
695 zmq_assert (!_has_handshake_timer);
696
697 if (_options.handshake_ivl > 0) {
698 add_timer (_options.handshake_ivl, handshake_timer_id);
699 _has_handshake_timer = true;
700 }
701}
702
703bool zmq::stream_engine_base_t::init_properties (properties_t &properties_)
704{
705 if (_peer_address.empty ())
706 return false;
707 properties_.ZMQ_MAP_INSERT_OR_EMPLACE (
708 std::string (ZMQ_MSG_PROPERTY_PEER_ADDRESS), _peer_address);
709
710 // Private property to support deprecated SRCFD
711 std::ostringstream stream;
712 stream << static_cast<int> (_s);
713 std::string fd_string = stream.str ();
714 properties_.ZMQ_MAP_INSERT_OR_EMPLACE (std::string ("__fd"),
715 ZMQ_MOVE (fd_string));
716 return true;
717}
718
719void zmq::stream_engine_base_t::timer_event (int id_)
720{
721 if (id_ == handshake_timer_id) {
722 _has_handshake_timer = false;
723 // handshake timer expired before handshake completed, so engine fail
724 error (timeout_error);
725 } else if (id_ == heartbeat_ivl_timer_id) {
726 _next_msg = &stream_engine_base_t::produce_ping_message;
727 out_event ();
728 add_timer (_options.heartbeat_interval, heartbeat_ivl_timer_id);
729 } else if (id_ == heartbeat_ttl_timer_id) {
730 _has_ttl_timer = false;
731 error (timeout_error);
732 } else if (id_ == heartbeat_timeout_timer_id) {
733 _has_timeout_timer = false;
734 error (timeout_error);
735 } else
736 // There are no other valid timer ids!
737 assert (false);
738}
739
740int zmq::stream_engine_base_t::read (void *data_, size_t size_)
741{
742 int rc = zmq::tcp_read (_s, data_, size_);
743
744 if (rc == 0) {
745 // connection closed by peer
746 errno = EPIPE;
747 return -1;
748 }
749
750 return rc;
751}
752
753int zmq::stream_engine_base_t::write (const void *data_, size_t size_)
754{
755 return zmq::tcp_write (_s, data_, size_);
756}
757