1/*
2 Copyright (c) 2007-2019 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include "macros.hpp"
32
33#include <limits.h>
34#include <string.h>
35
36#ifndef ZMQ_HAVE_WINDOWS
37#include <unistd.h>
38#endif
39
40#include <new>
41#include <sstream>
42
43#include "zmtp_engine.hpp"
44#include "io_thread.hpp"
45#include "session_base.hpp"
46#include "v1_encoder.hpp"
47#include "v1_decoder.hpp"
48#include "v2_encoder.hpp"
49#include "v2_decoder.hpp"
50#include "null_mechanism.hpp"
51#include "plain_client.hpp"
52#include "plain_server.hpp"
53#include "gssapi_client.hpp"
54#include "gssapi_server.hpp"
55#include "curve_client.hpp"
56#include "curve_server.hpp"
57#include "raw_decoder.hpp"
58#include "raw_encoder.hpp"
59#include "config.hpp"
60#include "err.hpp"
61#include "ip.hpp"
62#include "likely.hpp"
63#include "wire.hpp"
64
65zmq::zmtp_engine_t::zmtp_engine_t (
66 fd_t fd_,
67 const options_t &options_,
68 const endpoint_uri_pair_t &endpoint_uri_pair_) :
69 stream_engine_base_t (fd_, options_, endpoint_uri_pair_),
70 _greeting_size (v2_greeting_size),
71 _greeting_bytes_read (0),
72 _subscription_required (false),
73 _heartbeat_timeout (0)
74{
75 _next_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> (
76 &zmtp_engine_t::routing_id_msg);
77 _process_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> (
78 &zmtp_engine_t::process_routing_id_msg);
79
80 int rc = _pong_msg.init ();
81 errno_assert (rc == 0);
82
83 rc = _routing_id_msg.init ();
84 errno_assert (rc == 0);
85
86 if (_options.heartbeat_interval > 0) {
87 _heartbeat_timeout = _options.heartbeat_timeout;
88 if (_heartbeat_timeout == -1)
89 _heartbeat_timeout = _options.heartbeat_interval;
90 }
91}
92
93zmq::zmtp_engine_t::~zmtp_engine_t ()
94{
95 int rc = _routing_id_msg.close ();
96 errno_assert (rc == 0);
97}
98
99void zmq::zmtp_engine_t::plug_internal ()
100{
101 // start optional timer, to prevent handshake hanging on no input
102 set_handshake_timer ();
103
104 // Send the 'length' and 'flags' fields of the routing id message.
105 // The 'length' field is encoded in the long format.
106 _outpos = _greeting_send;
107 _outpos[_outsize++] = UCHAR_MAX;
108 put_uint64 (&_outpos[_outsize], _options.routing_id_size + 1);
109 _outsize += 8;
110 _outpos[_outsize++] = 0x7f;
111
112 set_pollin ();
113 set_pollout ();
114 // Flush all the data that may have been already received downstream.
115 in_event ();
116}
117
118// Position of the revision field in the greeting.
119const size_t revision_pos = 10;
120
121bool zmq::zmtp_engine_t::handshake ()
122{
123 zmq_assert (_greeting_bytes_read < _greeting_size);
124 // Receive the greeting.
125 const int rc = receive_greeting ();
126 if (rc == -1)
127 return false;
128 const bool unversioned = rc != 0;
129
130 if (!(this
131 ->*select_handshake_fun (unversioned,
132 _greeting_recv[revision_pos])) ())
133 return false;
134
135 // Start polling for output if necessary.
136 if (_outsize == 0)
137 set_pollout ();
138
139 if (_has_handshake_timer) {
140 cancel_timer (handshake_timer_id);
141 _has_handshake_timer = false;
142 }
143
144 return true;
145}
146
147int zmq::zmtp_engine_t::receive_greeting ()
148{
149 bool unversioned = false;
150 while (_greeting_bytes_read < _greeting_size) {
151 const int n = read (_greeting_recv + _greeting_bytes_read,
152 _greeting_size - _greeting_bytes_read);
153 if (n == -1) {
154 if (errno != EAGAIN)
155 error (connection_error);
156 return -1;
157 }
158
159 _greeting_bytes_read += n;
160
161 // We have received at least one byte from the peer.
162 // If the first byte is not 0xff, we know that the
163 // peer is using unversioned protocol.
164 if (_greeting_recv[0] != 0xff) {
165 unversioned = true;
166 break;
167 }
168
169 if (_greeting_bytes_read < signature_size)
170 continue;
171
172 // Inspect the right-most bit of the 10th byte (which coincides
173 // with the 'flags' field if a regular message was sent).
174 // Zero indicates this is a header of a routing id message
175 // (i.e. the peer is using the unversioned protocol).
176 if (!(_greeting_recv[9] & 0x01)) {
177 unversioned = true;
178 break;
179 }
180
181 // The peer is using versioned protocol.
182 receive_greeting_versioned ();
183 }
184 return unversioned ? 1 : 0;
185}
186
187void zmq::zmtp_engine_t::receive_greeting_versioned ()
188{
189 // Send the major version number.
190 if (_outpos + _outsize == _greeting_send + signature_size) {
191 if (_outsize == 0)
192 set_pollout ();
193 _outpos[_outsize++] = 3; // Major version number
194 }
195
196 if (_greeting_bytes_read > signature_size) {
197 if (_outpos + _outsize == _greeting_send + signature_size + 1) {
198 if (_outsize == 0)
199 set_pollout ();
200
201 // Use ZMTP/2.0 to talk to older peers.
202 if (_greeting_recv[revision_pos] == ZMTP_1_0
203 || _greeting_recv[revision_pos] == ZMTP_2_0)
204 _outpos[_outsize++] = _options.type;
205 else {
206 _outpos[_outsize++] = 0; // Minor version number
207 memset (_outpos + _outsize, 0, 20);
208
209 zmq_assert (_options.mechanism == ZMQ_NULL
210 || _options.mechanism == ZMQ_PLAIN
211 || _options.mechanism == ZMQ_CURVE
212 || _options.mechanism == ZMQ_GSSAPI);
213
214 if (_options.mechanism == ZMQ_NULL)
215 memcpy (_outpos + _outsize, "NULL", 4);
216 else if (_options.mechanism == ZMQ_PLAIN)
217 memcpy (_outpos + _outsize, "PLAIN", 5);
218 else if (_options.mechanism == ZMQ_GSSAPI)
219 memcpy (_outpos + _outsize, "GSSAPI", 6);
220 else if (_options.mechanism == ZMQ_CURVE)
221 memcpy (_outpos + _outsize, "CURVE", 5);
222 _outsize += 20;
223 memset (_outpos + _outsize, 0, 32);
224 _outsize += 32;
225 _greeting_size = v3_greeting_size;
226 }
227 }
228 }
229}
230
231zmq::zmtp_engine_t::handshake_fun_t
232zmq::zmtp_engine_t::select_handshake_fun (bool unversioned_,
233 unsigned char revision_)
234{
235 // Is the peer using ZMTP/1.0 with no revision number?
236 if (unversioned_) {
237 return &zmtp_engine_t::handshake_v1_0_unversioned;
238 }
239 switch (revision_) {
240 case ZMTP_1_0:
241 return &zmtp_engine_t::handshake_v1_0;
242 case ZMTP_2_0:
243 return &zmtp_engine_t::handshake_v2_0;
244 default:
245 return &zmtp_engine_t::handshake_v3_0;
246 }
247}
248
249bool zmq::zmtp_engine_t::handshake_v1_0_unversioned ()
250{
251 // We send and receive rest of routing id message
252 if (session ()->zap_enabled ()) {
253 // reject ZMTP 1.0 connections if ZAP is enabled
254 error (protocol_error);
255 return false;
256 }
257
258 _encoder = new (std::nothrow) v1_encoder_t (_options.out_batch_size);
259 alloc_assert (_encoder);
260
261 _decoder = new (std::nothrow)
262 v1_decoder_t (_options.in_batch_size, _options.maxmsgsize);
263 alloc_assert (_decoder);
264
265 // We have already sent the message header.
266 // Since there is no way to tell the encoder to
267 // skip the message header, we simply throw that
268 // header data away.
269 const size_t header_size =
270 _options.routing_id_size + 1 >= UCHAR_MAX ? 10 : 2;
271 unsigned char tmp[10], *bufferp = tmp;
272
273 // Prepare the routing id message and load it into encoder.
274 // Then consume bytes we have already sent to the peer.
275 int rc = _routing_id_msg.close ();
276 zmq_assert (rc == 0);
277 rc = _routing_id_msg.init_size (_options.routing_id_size);
278 zmq_assert (rc == 0);
279 memcpy (_routing_id_msg.data (), _options.routing_id,
280 _options.routing_id_size);
281 _encoder->load_msg (&_routing_id_msg);
282 const size_t buffer_size = _encoder->encode (&bufferp, header_size);
283 zmq_assert (buffer_size == header_size);
284
285 // Make sure the decoder sees the data we have already received.
286 _inpos = _greeting_recv;
287 _insize = _greeting_bytes_read;
288
289 // To allow for interoperability with peers that do not forward
290 // their subscriptions, we inject a phantom subscription message
291 // message into the incoming message stream.
292 if (_options.type == ZMQ_PUB || _options.type == ZMQ_XPUB)
293 _subscription_required = true;
294
295 // We are sending our routing id now and the next message
296 // will come from the socket.
297 _next_msg = &zmtp_engine_t::pull_msg_from_session;
298
299 // We are expecting routing id message.
300 _process_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> (
301 &zmtp_engine_t::process_routing_id_msg);
302
303 return true;
304}
305
306bool zmq::zmtp_engine_t::handshake_v1_0 ()
307{
308 if (session ()->zap_enabled ()) {
309 // reject ZMTP 1.0 connections if ZAP is enabled
310 error (protocol_error);
311 return false;
312 }
313
314 _encoder = new (std::nothrow) v1_encoder_t (_options.out_batch_size);
315 alloc_assert (_encoder);
316
317 _decoder = new (std::nothrow)
318 v1_decoder_t (_options.in_batch_size, _options.maxmsgsize);
319 alloc_assert (_decoder);
320
321 return true;
322}
323
324bool zmq::zmtp_engine_t::handshake_v2_0 ()
325{
326 if (session ()->zap_enabled ()) {
327 // reject ZMTP 2.0 connections if ZAP is enabled
328 error (protocol_error);
329 return false;
330 }
331
332 _encoder = new (std::nothrow) v2_encoder_t (_options.out_batch_size);
333 alloc_assert (_encoder);
334
335 _decoder = new (std::nothrow) v2_decoder_t (
336 _options.in_batch_size, _options.maxmsgsize, _options.zero_copy);
337 alloc_assert (_decoder);
338
339 return true;
340}
341
342bool zmq::zmtp_engine_t::handshake_v3_0 ()
343{
344 _encoder = new (std::nothrow) v2_encoder_t (_options.out_batch_size);
345 alloc_assert (_encoder);
346
347 _decoder = new (std::nothrow) v2_decoder_t (
348 _options.in_batch_size, _options.maxmsgsize, _options.zero_copy);
349 alloc_assert (_decoder);
350
351 if (_options.mechanism == ZMQ_NULL
352 && memcmp (_greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
353 20)
354 == 0) {
355 _mechanism = new (std::nothrow)
356 null_mechanism_t (session (), _peer_address, _options);
357 alloc_assert (_mechanism);
358 } else if (_options.mechanism == ZMQ_PLAIN
359 && memcmp (_greeting_recv + 12,
360 "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
361 == 0) {
362 if (_options.as_server)
363 _mechanism = new (std::nothrow)
364 plain_server_t (session (), _peer_address, _options);
365 else
366 _mechanism =
367 new (std::nothrow) plain_client_t (session (), _options);
368 alloc_assert (_mechanism);
369 }
370#ifdef ZMQ_HAVE_CURVE
371 else if (_options.mechanism == ZMQ_CURVE
372 && memcmp (_greeting_recv + 12,
373 "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
374 == 0) {
375 if (_options.as_server)
376 _mechanism = new (std::nothrow)
377 curve_server_t (session (), _peer_address, _options);
378 else
379 _mechanism =
380 new (std::nothrow) curve_client_t (session (), _options);
381 alloc_assert (_mechanism);
382 }
383#endif
384#ifdef HAVE_LIBGSSAPI_KRB5
385 else if (_options.mechanism == ZMQ_GSSAPI
386 && memcmp (_greeting_recv + 12,
387 "GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
388 == 0) {
389 if (_options.as_server)
390 _mechanism = new (std::nothrow)
391 gssapi_server_t (session (), _peer_address, _options);
392 else
393 _mechanism =
394 new (std::nothrow) gssapi_client_t (session (), _options);
395 alloc_assert (_mechanism);
396 }
397#endif
398 else {
399 socket ()->event_handshake_failed_protocol (
400 session ()->get_endpoint (),
401 ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH);
402 error (protocol_error);
403 return false;
404 }
405 _next_msg = &zmtp_engine_t::next_handshake_command;
406 _process_msg = &zmtp_engine_t::process_handshake_command;
407
408 return true;
409}
410
411int zmq::zmtp_engine_t::routing_id_msg (msg_t *msg_)
412{
413 int rc = msg_->init_size (_options.routing_id_size);
414 errno_assert (rc == 0);
415 if (_options.routing_id_size > 0)
416 memcpy (msg_->data (), _options.routing_id, _options.routing_id_size);
417 _next_msg = &zmtp_engine_t::pull_msg_from_session;
418 return 0;
419}
420
421int zmq::zmtp_engine_t::process_routing_id_msg (msg_t *msg_)
422{
423 if (_options.recv_routing_id) {
424 msg_->set_flags (msg_t::routing_id);
425 int rc = session ()->push_msg (msg_);
426 errno_assert (rc == 0);
427 } else {
428 int rc = msg_->close ();
429 errno_assert (rc == 0);
430 rc = msg_->init ();
431 errno_assert (rc == 0);
432 }
433
434 if (_subscription_required) {
435 msg_t subscription;
436
437 // Inject the subscription message, so that also
438 // ZMQ 2.x peers receive published messages.
439 int rc = subscription.init_size (1);
440 errno_assert (rc == 0);
441 *static_cast<unsigned char *> (subscription.data ()) = 1;
442 rc = session ()->push_msg (&subscription);
443 errno_assert (rc == 0);
444 }
445
446 _process_msg = &zmtp_engine_t::push_msg_to_session;
447
448 return 0;
449}
450
451int zmq::zmtp_engine_t::produce_ping_message (msg_t *msg_)
452{
453 // 16-bit TTL + \4PING == 7
454 const size_t ping_ttl_len = msg_t::ping_cmd_name_size + 2;
455 zmq_assert (_mechanism != NULL);
456
457 int rc = msg_->init_size (ping_ttl_len);
458 errno_assert (rc == 0);
459 msg_->set_flags (msg_t::command);
460 // Copy in the command message
461 memcpy (msg_->data (), "\4PING", msg_t::ping_cmd_name_size);
462
463 uint16_t ttl_val = htons (_options.heartbeat_ttl);
464 memcpy (static_cast<uint8_t *> (msg_->data ()) + msg_t::ping_cmd_name_size,
465 &ttl_val, sizeof (ttl_val));
466
467 rc = _mechanism->encode (msg_);
468 _next_msg = &zmtp_engine_t::pull_and_encode;
469 if (!_has_timeout_timer && _heartbeat_timeout > 0) {
470 add_timer (_heartbeat_timeout, heartbeat_timeout_timer_id);
471 _has_timeout_timer = true;
472 }
473 return rc;
474}
475
476int zmq::zmtp_engine_t::produce_pong_message (msg_t *msg_)
477{
478 zmq_assert (_mechanism != NULL);
479
480 int rc = msg_->move (_pong_msg);
481 errno_assert (rc == 0);
482
483 rc = _mechanism->encode (msg_);
484 _next_msg = &zmtp_engine_t::pull_and_encode;
485 return rc;
486}
487
488int zmq::zmtp_engine_t::process_heartbeat_message (msg_t *msg_)
489{
490 if (msg_->is_ping ()) {
491 // 16-bit TTL + \4PING == 7
492 const size_t ping_ttl_len = msg_t::ping_cmd_name_size + 2;
493 const size_t ping_max_ctx_len = 16;
494 uint16_t remote_heartbeat_ttl;
495
496 // Get the remote heartbeat TTL to setup the timer
497 memcpy (&remote_heartbeat_ttl,
498 static_cast<uint8_t *> (msg_->data ())
499 + msg_t::ping_cmd_name_size,
500 ping_ttl_len - msg_t::ping_cmd_name_size);
501 remote_heartbeat_ttl = ntohs (remote_heartbeat_ttl);
502 // The remote heartbeat is in 10ths of a second
503 // so we multiply it by 100 to get the timer interval in ms.
504 remote_heartbeat_ttl *= 100;
505
506 if (!_has_ttl_timer && remote_heartbeat_ttl > 0) {
507 add_timer (remote_heartbeat_ttl, heartbeat_ttl_timer_id);
508 _has_ttl_timer = true;
509 }
510
511 // As per ZMTP 3.1 the PING command might contain an up to 16 bytes
512 // context which needs to be PONGed back, so build the pong message
513 // here and store it. Truncate it if it's too long.
514 // Given the engine goes straight to out_event, sequential PINGs will
515 // not be a problem.
516 const size_t context_len =
517 std::min (msg_->size () - ping_ttl_len, ping_max_ctx_len);
518 const int rc =
519 _pong_msg.init_size (msg_t::ping_cmd_name_size + context_len);
520 errno_assert (rc == 0);
521 _pong_msg.set_flags (msg_t::command);
522 memcpy (_pong_msg.data (), "\4PONG", msg_t::ping_cmd_name_size);
523 if (context_len > 0)
524 memcpy (static_cast<uint8_t *> (_pong_msg.data ())
525 + msg_t::ping_cmd_name_size,
526 static_cast<uint8_t *> (msg_->data ()) + ping_ttl_len,
527 context_len);
528
529 _next_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> (
530 &zmtp_engine_t::produce_pong_message);
531 out_event ();
532 }
533
534 return 0;
535}
536
537int zmq::zmtp_engine_t::process_command_message (msg_t *msg_)
538{
539 const uint8_t cmd_name_size =
540 *(static_cast<const uint8_t *> (msg_->data ()));
541 const size_t ping_name_size = msg_t::ping_cmd_name_size - 1;
542 const size_t sub_name_size = msg_t::sub_cmd_name_size - 1;
543 const size_t cancel_name_size = msg_t::cancel_cmd_name_size - 1;
544 // Malformed command
545 if (unlikely (msg_->size () < cmd_name_size + sizeof (cmd_name_size)))
546 return -1;
547
548 uint8_t *cmd_name = (static_cast<uint8_t *> (msg_->data ())) + 1;
549 if (cmd_name_size == ping_name_size
550 && memcmp (cmd_name, "PING", cmd_name_size) == 0)
551 msg_->set_flags (zmq::msg_t::ping);
552 if (cmd_name_size == ping_name_size
553 && memcmp (cmd_name, "PONG", cmd_name_size) == 0)
554 msg_->set_flags (zmq::msg_t::pong);
555 if (cmd_name_size == sub_name_size
556 && memcmp (cmd_name, "SUBSCRIBE", cmd_name_size) == 0)
557 msg_->set_flags (zmq::msg_t::subscribe);
558 if (cmd_name_size == cancel_name_size
559 && memcmp (cmd_name, "CANCEL", cmd_name_size) == 0)
560 msg_->set_flags (zmq::msg_t::cancel);
561
562 if (msg_->is_ping () || msg_->is_pong ())
563 return process_heartbeat_message (msg_);
564
565 return 0;
566}
567