1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include "macros.hpp"
32#include "msg.hpp"
33
34#include <string.h>
35#include <stdlib.h>
36#include <new>
37
38#include "stdint.hpp"
39#include "likely.hpp"
40#include "metadata.hpp"
41#include "err.hpp"
42
43// Check whether the sizes of public representation of the message (zmq_msg_t)
44// and private representation of the message (zmq::msg_t) match.
45
46typedef char
47 zmq_msg_size_check[2 * ((sizeof (zmq::msg_t) == sizeof (zmq_msg_t)) != 0)
48 - 1];
49
50bool zmq::msg_t::check () const
51{
52 return _u.base.type >= type_min && _u.base.type <= type_max;
53}
54
55int zmq::msg_t::init (void *data_,
56 size_t size_,
57 msg_free_fn *ffn_,
58 void *hint_,
59 content_t *content_)
60{
61 if (size_ < max_vsm_size) {
62 const int rc = init_size (size_);
63
64 if (rc != -1) {
65 memcpy (data (), data_, size_);
66 return 0;
67 }
68 return -1;
69 }
70 if (content_) {
71 return init_external_storage (content_, data_, size_, ffn_, hint_);
72 }
73 return init_data (data_, size_, ffn_, hint_);
74}
75
76int zmq::msg_t::init ()
77{
78 _u.vsm.metadata = NULL;
79 _u.vsm.type = type_vsm;
80 _u.vsm.flags = 0;
81 _u.vsm.size = 0;
82 _u.vsm.group[0] = '\0';
83 _u.vsm.routing_id = 0;
84 return 0;
85}
86
87int zmq::msg_t::init_size (size_t size_)
88{
89 if (size_ <= max_vsm_size) {
90 _u.vsm.metadata = NULL;
91 _u.vsm.type = type_vsm;
92 _u.vsm.flags = 0;
93 _u.vsm.size = static_cast<unsigned char> (size_);
94 _u.vsm.group[0] = '\0';
95 _u.vsm.routing_id = 0;
96 } else {
97 _u.lmsg.metadata = NULL;
98 _u.lmsg.type = type_lmsg;
99 _u.lmsg.flags = 0;
100 _u.lmsg.group[0] = '\0';
101 _u.lmsg.routing_id = 0;
102 _u.lmsg.content = NULL;
103 if (sizeof (content_t) + size_ > size_)
104 _u.lmsg.content =
105 static_cast<content_t *> (malloc (sizeof (content_t) + size_));
106 if (unlikely (!_u.lmsg.content)) {
107 errno = ENOMEM;
108 return -1;
109 }
110
111 _u.lmsg.content->data = _u.lmsg.content + 1;
112 _u.lmsg.content->size = size_;
113 _u.lmsg.content->ffn = NULL;
114 _u.lmsg.content->hint = NULL;
115 new (&_u.lmsg.content->refcnt) zmq::atomic_counter_t ();
116 }
117 return 0;
118}
119
120int zmq::msg_t::init_external_storage (content_t *content_,
121 void *data_,
122 size_t size_,
123 msg_free_fn *ffn_,
124 void *hint_)
125{
126 zmq_assert (NULL != data_);
127 zmq_assert (NULL != content_);
128
129 _u.zclmsg.metadata = NULL;
130 _u.zclmsg.type = type_zclmsg;
131 _u.zclmsg.flags = 0;
132 _u.zclmsg.group[0] = '\0';
133 _u.zclmsg.routing_id = 0;
134
135 _u.zclmsg.content = content_;
136 _u.zclmsg.content->data = data_;
137 _u.zclmsg.content->size = size_;
138 _u.zclmsg.content->ffn = ffn_;
139 _u.zclmsg.content->hint = hint_;
140 new (&_u.zclmsg.content->refcnt) zmq::atomic_counter_t ();
141
142 return 0;
143}
144
145int zmq::msg_t::init_data (void *data_,
146 size_t size_,
147 msg_free_fn *ffn_,
148 void *hint_)
149{
150 // If data is NULL and size is not 0, a segfault
151 // would occur once the data is accessed
152 zmq_assert (data_ != NULL || size_ == 0);
153
154 // Initialize constant message if there's no need to deallocate
155 if (ffn_ == NULL) {
156 _u.cmsg.metadata = NULL;
157 _u.cmsg.type = type_cmsg;
158 _u.cmsg.flags = 0;
159 _u.cmsg.data = data_;
160 _u.cmsg.size = size_;
161 _u.cmsg.group[0] = '\0';
162 _u.cmsg.routing_id = 0;
163 } else {
164 _u.lmsg.metadata = NULL;
165 _u.lmsg.type = type_lmsg;
166 _u.lmsg.flags = 0;
167 _u.lmsg.group[0] = '\0';
168 _u.lmsg.routing_id = 0;
169 _u.lmsg.content =
170 static_cast<content_t *> (malloc (sizeof (content_t)));
171 if (!_u.lmsg.content) {
172 errno = ENOMEM;
173 return -1;
174 }
175
176 _u.lmsg.content->data = data_;
177 _u.lmsg.content->size = size_;
178 _u.lmsg.content->ffn = ffn_;
179 _u.lmsg.content->hint = hint_;
180 new (&_u.lmsg.content->refcnt) zmq::atomic_counter_t ();
181 }
182 return 0;
183}
184
185int zmq::msg_t::init_delimiter ()
186{
187 _u.delimiter.metadata = NULL;
188 _u.delimiter.type = type_delimiter;
189 _u.delimiter.flags = 0;
190 _u.delimiter.group[0] = '\0';
191 _u.delimiter.routing_id = 0;
192 return 0;
193}
194
195int zmq::msg_t::init_join ()
196{
197 _u.base.metadata = NULL;
198 _u.base.type = type_join;
199 _u.base.flags = 0;
200 _u.base.group[0] = '\0';
201 _u.base.routing_id = 0;
202 return 0;
203}
204
205int zmq::msg_t::init_leave ()
206{
207 _u.base.metadata = NULL;
208 _u.base.type = type_leave;
209 _u.base.flags = 0;
210 _u.base.group[0] = '\0';
211 _u.base.routing_id = 0;
212 return 0;
213}
214
215int zmq::msg_t::close ()
216{
217 // Check the validity of the message.
218 if (unlikely (!check ())) {
219 errno = EFAULT;
220 return -1;
221 }
222
223 if (_u.base.type == type_lmsg) {
224 // If the content is not shared, or if it is shared and the reference
225 // count has dropped to zero, deallocate it.
226 if (!(_u.lmsg.flags & msg_t::shared)
227 || !_u.lmsg.content->refcnt.sub (1)) {
228 // We used "placement new" operator to initialize the reference
229 // counter so we call the destructor explicitly now.
230 _u.lmsg.content->refcnt.~atomic_counter_t ();
231
232 if (_u.lmsg.content->ffn)
233 _u.lmsg.content->ffn (_u.lmsg.content->data,
234 _u.lmsg.content->hint);
235 free (_u.lmsg.content);
236 }
237 }
238
239 if (is_zcmsg ()) {
240 zmq_assert (_u.zclmsg.content->ffn);
241
242 // If the content is not shared, or if it is shared and the reference
243 // count has dropped to zero, deallocate it.
244 if (!(_u.zclmsg.flags & msg_t::shared)
245 || !_u.zclmsg.content->refcnt.sub (1)) {
246 // We used "placement new" operator to initialize the reference
247 // counter so we call the destructor explicitly now.
248 _u.zclmsg.content->refcnt.~atomic_counter_t ();
249
250 _u.zclmsg.content->ffn (_u.zclmsg.content->data,
251 _u.zclmsg.content->hint);
252 }
253 }
254
255 if (_u.base.metadata != NULL) {
256 if (_u.base.metadata->drop_ref ()) {
257 LIBZMQ_DELETE (_u.base.metadata);
258 }
259 _u.base.metadata = NULL;
260 }
261
262 // Make the message invalid.
263 _u.base.type = 0;
264
265 return 0;
266}
267
268int zmq::msg_t::move (msg_t &src_)
269{
270 // Check the validity of the source.
271 if (unlikely (!src_.check ())) {
272 errno = EFAULT;
273 return -1;
274 }
275
276 int rc = close ();
277 if (unlikely (rc < 0))
278 return rc;
279
280 *this = src_;
281
282 rc = src_.init ();
283 if (unlikely (rc < 0))
284 return rc;
285
286 return 0;
287}
288
289int zmq::msg_t::copy (msg_t &src_)
290{
291 // Check the validity of the source.
292 if (unlikely (!src_.check ())) {
293 errno = EFAULT;
294 return -1;
295 }
296
297 int rc = close ();
298 if (unlikely (rc < 0))
299 return rc;
300
301 // The initial reference count, when a non-shared message is initially
302 // shared (between the original and the copy we create here).
303 const atomic_counter_t::integer_t initial_shared_refcnt = 2;
304
305 if (src_.is_lmsg () || src_.is_zcmsg ()) {
306 // One reference is added to shared messages. Non-shared messages
307 // are turned into shared messages.
308 if (src_.flags () & msg_t::shared)
309 src_.refcnt ()->add (1);
310 else {
311 src_.set_flags (msg_t::shared);
312 src_.refcnt ()->set (initial_shared_refcnt);
313 }
314 }
315
316 if (src_._u.base.metadata != NULL)
317 src_._u.base.metadata->add_ref ();
318
319 *this = src_;
320
321 return 0;
322}
323
324void *zmq::msg_t::data ()
325{
326 // Check the validity of the message.
327 zmq_assert (check ());
328
329 switch (_u.base.type) {
330 case type_vsm:
331 return _u.vsm.data;
332 case type_lmsg:
333 return _u.lmsg.content->data;
334 case type_cmsg:
335 return _u.cmsg.data;
336 case type_zclmsg:
337 return _u.zclmsg.content->data;
338 default:
339 zmq_assert (false);
340 return NULL;
341 }
342}
343
344size_t zmq::msg_t::size () const
345{
346 // Check the validity of the message.
347 zmq_assert (check ());
348
349 switch (_u.base.type) {
350 case type_vsm:
351 return _u.vsm.size;
352 case type_lmsg:
353 return _u.lmsg.content->size;
354 case type_zclmsg:
355 return _u.zclmsg.content->size;
356 case type_cmsg:
357 return _u.cmsg.size;
358 default:
359 zmq_assert (false);
360 return 0;
361 }
362}
363
364unsigned char zmq::msg_t::flags () const
365{
366 return _u.base.flags;
367}
368
369void zmq::msg_t::set_flags (unsigned char flags_)
370{
371 _u.base.flags |= flags_;
372}
373
374void zmq::msg_t::reset_flags (unsigned char flags_)
375{
376 _u.base.flags &= ~flags_;
377}
378
379zmq::metadata_t *zmq::msg_t::metadata () const
380{
381 return _u.base.metadata;
382}
383
384void zmq::msg_t::set_metadata (zmq::metadata_t *metadata_)
385{
386 assert (metadata_ != NULL);
387 assert (_u.base.metadata == NULL);
388 metadata_->add_ref ();
389 _u.base.metadata = metadata_;
390}
391
392void zmq::msg_t::reset_metadata ()
393{
394 if (_u.base.metadata) {
395 if (_u.base.metadata->drop_ref ()) {
396 LIBZMQ_DELETE (_u.base.metadata);
397 }
398 _u.base.metadata = NULL;
399 }
400}
401
402bool zmq::msg_t::is_routing_id () const
403{
404 return (_u.base.flags & routing_id) == routing_id;
405}
406
407bool zmq::msg_t::is_credential () const
408{
409 return (_u.base.flags & credential) == credential;
410}
411
412bool zmq::msg_t::is_delimiter () const
413{
414 return _u.base.type == type_delimiter;
415}
416
417bool zmq::msg_t::is_vsm () const
418{
419 return _u.base.type == type_vsm;
420}
421
422bool zmq::msg_t::is_cmsg () const
423{
424 return _u.base.type == type_cmsg;
425}
426
427bool zmq::msg_t::is_lmsg () const
428{
429 return _u.base.type == type_lmsg;
430}
431
432bool zmq::msg_t::is_zcmsg () const
433{
434 return _u.base.type == type_zclmsg;
435}
436
437bool zmq::msg_t::is_join () const
438{
439 return _u.base.type == type_join;
440}
441
442bool zmq::msg_t::is_leave () const
443{
444 return _u.base.type == type_leave;
445}
446
447bool zmq::msg_t::is_ping () const
448{
449 return (_u.base.flags & CMD_TYPE_MASK) == ping;
450}
451
452bool zmq::msg_t::is_pong () const
453{
454 return (_u.base.flags & CMD_TYPE_MASK) == pong;
455}
456
457size_t zmq::msg_t::command_body_size () const
458{
459 if (this->is_ping () || this->is_pong ())
460 return this->size () - ping_cmd_name_size;
461 if (this->is_subscribe ())
462 return this->size () - sub_cmd_name_size;
463 if (this->is_cancel ())
464 return this->size () - cancel_cmd_name_size;
465
466 return 0;
467}
468
469void *zmq::msg_t::command_body ()
470{
471 unsigned char *data = NULL;
472 if (this->is_ping () || this->is_pong ())
473 data =
474 static_cast<unsigned char *> (this->data ()) + ping_cmd_name_size;
475 if (this->is_subscribe ())
476 data = static_cast<unsigned char *> (this->data ()) + sub_cmd_name_size;
477 if (this->is_cancel ())
478 data =
479 static_cast<unsigned char *> (this->data ()) + cancel_cmd_name_size;
480
481 return data;
482}
483
484void zmq::msg_t::add_refs (int refs_)
485{
486 zmq_assert (refs_ >= 0);
487
488 // Operation not supported for messages with metadata.
489 zmq_assert (_u.base.metadata == NULL);
490
491 // No copies required.
492 if (!refs_)
493 return;
494
495 // VSMs, CMSGS and delimiters can be copied straight away. The only
496 // message type that needs special care are long messages.
497 if (_u.base.type == type_lmsg || is_zcmsg ()) {
498 if (_u.base.flags & msg_t::shared)
499 refcnt ()->add (refs_);
500 else {
501 refcnt ()->set (refs_ + 1);
502 _u.base.flags |= msg_t::shared;
503 }
504 }
505}
506
507bool zmq::msg_t::rm_refs (int refs_)
508{
509 zmq_assert (refs_ >= 0);
510
511 // Operation not supported for messages with metadata.
512 zmq_assert (_u.base.metadata == NULL);
513
514 // No copies required.
515 if (!refs_)
516 return true;
517
518 // If there's only one reference close the message.
519 if ((_u.base.type != type_zclmsg && _u.base.type != type_lmsg)
520 || !(_u.base.flags & msg_t::shared)) {
521 close ();
522 return false;
523 }
524
525 // The only message type that needs special care are long and zcopy messages.
526 if (_u.base.type == type_lmsg && !_u.lmsg.content->refcnt.sub (refs_)) {
527 // We used "placement new" operator to initialize the reference
528 // counter so we call the destructor explicitly now.
529 _u.lmsg.content->refcnt.~atomic_counter_t ();
530
531 if (_u.lmsg.content->ffn)
532 _u.lmsg.content->ffn (_u.lmsg.content->data, _u.lmsg.content->hint);
533 free (_u.lmsg.content);
534
535 return false;
536 }
537
538 if (is_zcmsg () && !_u.zclmsg.content->refcnt.sub (refs_)) {
539 // storage for rfcnt is provided externally
540 if (_u.zclmsg.content->ffn) {
541 _u.zclmsg.content->ffn (_u.zclmsg.content->data,
542 _u.zclmsg.content->hint);
543 }
544
545 return false;
546 }
547
548 return true;
549}
550
551uint32_t zmq::msg_t::get_routing_id ()
552{
553 return _u.base.routing_id;
554}
555
556int zmq::msg_t::set_routing_id (uint32_t routing_id_)
557{
558 if (routing_id_) {
559 _u.base.routing_id = routing_id_;
560 return 0;
561 }
562 errno = EINVAL;
563 return -1;
564}
565
566int zmq::msg_t::reset_routing_id ()
567{
568 _u.base.routing_id = 0;
569 return 0;
570}
571
572const char *zmq::msg_t::group ()
573{
574 return _u.base.group;
575}
576
577int zmq::msg_t::set_group (const char *group_)
578{
579 return set_group (group_, ZMQ_GROUP_MAX_LENGTH);
580}
581
582int zmq::msg_t::set_group (const char *group_, size_t length_)
583{
584 if (length_ > ZMQ_GROUP_MAX_LENGTH) {
585 errno = EINVAL;
586 return -1;
587 }
588
589 strncpy (_u.base.group, group_, length_);
590 _u.base.group[length_] = '\0';
591
592 return 0;
593}
594
595zmq::atomic_counter_t *zmq::msg_t::refcnt ()
596{
597 switch (_u.base.type) {
598 case type_lmsg:
599 return &_u.lmsg.content->refcnt;
600 case type_zclmsg:
601 return &_u.zclmsg.content->refcnt;
602 default:
603 zmq_assert (false);
604 return NULL;
605 }
606}
607