1 | /* |
2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #include "precompiled.hpp" |
31 | #include "macros.hpp" |
32 | #include "msg.hpp" |
33 | |
34 | #include <string.h> |
35 | #include <stdlib.h> |
36 | #include <new> |
37 | |
38 | #include "stdint.hpp" |
39 | #include "likely.hpp" |
40 | #include "metadata.hpp" |
41 | #include "err.hpp" |
42 | |
43 | // Check whether the sizes of public representation of the message (zmq_msg_t) |
44 | // and private representation of the message (zmq::msg_t) match. |
45 | |
46 | typedef char |
47 | zmq_msg_size_check[2 * ((sizeof (zmq::msg_t) == sizeof (zmq_msg_t)) != 0) |
48 | - 1]; |
49 | |
50 | bool zmq::msg_t::check () const |
51 | { |
52 | return _u.base.type >= type_min && _u.base.type <= type_max; |
53 | } |
54 | |
55 | int zmq::msg_t::init (void *data_, |
56 | size_t size_, |
57 | msg_free_fn *ffn_, |
58 | void *hint_, |
59 | content_t *content_) |
60 | { |
61 | if (size_ < max_vsm_size) { |
62 | const int rc = init_size (size_); |
63 | |
64 | if (rc != -1) { |
65 | memcpy (data (), data_, size_); |
66 | return 0; |
67 | } |
68 | return -1; |
69 | } |
70 | if (content_) { |
71 | return init_external_storage (content_, data_, size_, ffn_, hint_); |
72 | } |
73 | return init_data (data_, size_, ffn_, hint_); |
74 | } |
75 | |
76 | int zmq::msg_t::init () |
77 | { |
78 | _u.vsm.metadata = NULL; |
79 | _u.vsm.type = type_vsm; |
80 | _u.vsm.flags = 0; |
81 | _u.vsm.size = 0; |
82 | _u.vsm.group[0] = '\0'; |
83 | _u.vsm.routing_id = 0; |
84 | return 0; |
85 | } |
86 | |
87 | int zmq::msg_t::init_size (size_t size_) |
88 | { |
89 | if (size_ <= max_vsm_size) { |
90 | _u.vsm.metadata = NULL; |
91 | _u.vsm.type = type_vsm; |
92 | _u.vsm.flags = 0; |
93 | _u.vsm.size = static_cast<unsigned char> (size_); |
94 | _u.vsm.group[0] = '\0'; |
95 | _u.vsm.routing_id = 0; |
96 | } else { |
97 | _u.lmsg.metadata = NULL; |
98 | _u.lmsg.type = type_lmsg; |
99 | _u.lmsg.flags = 0; |
100 | _u.lmsg.group[0] = '\0'; |
101 | _u.lmsg.routing_id = 0; |
102 | _u.lmsg.content = NULL; |
103 | if (sizeof (content_t) + size_ > size_) |
104 | _u.lmsg.content = |
105 | static_cast<content_t *> (malloc (sizeof (content_t) + size_)); |
106 | if (unlikely (!_u.lmsg.content)) { |
107 | errno = ENOMEM; |
108 | return -1; |
109 | } |
110 | |
111 | _u.lmsg.content->data = _u.lmsg.content + 1; |
112 | _u.lmsg.content->size = size_; |
113 | _u.lmsg.content->ffn = NULL; |
114 | _u.lmsg.content->hint = NULL; |
115 | new (&_u.lmsg.content->refcnt) zmq::atomic_counter_t (); |
116 | } |
117 | return 0; |
118 | } |
119 | |
120 | int zmq::msg_t::init_external_storage (content_t *content_, |
121 | void *data_, |
122 | size_t size_, |
123 | msg_free_fn *ffn_, |
124 | void *hint_) |
125 | { |
126 | zmq_assert (NULL != data_); |
127 | zmq_assert (NULL != content_); |
128 | |
129 | _u.zclmsg.metadata = NULL; |
130 | _u.zclmsg.type = type_zclmsg; |
131 | _u.zclmsg.flags = 0; |
132 | _u.zclmsg.group[0] = '\0'; |
133 | _u.zclmsg.routing_id = 0; |
134 | |
135 | _u.zclmsg.content = content_; |
136 | _u.zclmsg.content->data = data_; |
137 | _u.zclmsg.content->size = size_; |
138 | _u.zclmsg.content->ffn = ffn_; |
139 | _u.zclmsg.content->hint = hint_; |
140 | new (&_u.zclmsg.content->refcnt) zmq::atomic_counter_t (); |
141 | |
142 | return 0; |
143 | } |
144 | |
145 | int zmq::msg_t::init_data (void *data_, |
146 | size_t size_, |
147 | msg_free_fn *ffn_, |
148 | void *hint_) |
149 | { |
150 | // If data is NULL and size is not 0, a segfault |
151 | // would occur once the data is accessed |
152 | zmq_assert (data_ != NULL || size_ == 0); |
153 | |
154 | // Initialize constant message if there's no need to deallocate |
155 | if (ffn_ == NULL) { |
156 | _u.cmsg.metadata = NULL; |
157 | _u.cmsg.type = type_cmsg; |
158 | _u.cmsg.flags = 0; |
159 | _u.cmsg.data = data_; |
160 | _u.cmsg.size = size_; |
161 | _u.cmsg.group[0] = '\0'; |
162 | _u.cmsg.routing_id = 0; |
163 | } else { |
164 | _u.lmsg.metadata = NULL; |
165 | _u.lmsg.type = type_lmsg; |
166 | _u.lmsg.flags = 0; |
167 | _u.lmsg.group[0] = '\0'; |
168 | _u.lmsg.routing_id = 0; |
169 | _u.lmsg.content = |
170 | static_cast<content_t *> (malloc (sizeof (content_t))); |
171 | if (!_u.lmsg.content) { |
172 | errno = ENOMEM; |
173 | return -1; |
174 | } |
175 | |
176 | _u.lmsg.content->data = data_; |
177 | _u.lmsg.content->size = size_; |
178 | _u.lmsg.content->ffn = ffn_; |
179 | _u.lmsg.content->hint = hint_; |
180 | new (&_u.lmsg.content->refcnt) zmq::atomic_counter_t (); |
181 | } |
182 | return 0; |
183 | } |
184 | |
185 | int zmq::msg_t::init_delimiter () |
186 | { |
187 | _u.delimiter.metadata = NULL; |
188 | _u.delimiter.type = type_delimiter; |
189 | _u.delimiter.flags = 0; |
190 | _u.delimiter.group[0] = '\0'; |
191 | _u.delimiter.routing_id = 0; |
192 | return 0; |
193 | } |
194 | |
195 | int zmq::msg_t::init_join () |
196 | { |
197 | _u.base.metadata = NULL; |
198 | _u.base.type = type_join; |
199 | _u.base.flags = 0; |
200 | _u.base.group[0] = '\0'; |
201 | _u.base.routing_id = 0; |
202 | return 0; |
203 | } |
204 | |
205 | int zmq::msg_t::init_leave () |
206 | { |
207 | _u.base.metadata = NULL; |
208 | _u.base.type = type_leave; |
209 | _u.base.flags = 0; |
210 | _u.base.group[0] = '\0'; |
211 | _u.base.routing_id = 0; |
212 | return 0; |
213 | } |
214 | |
215 | int zmq::msg_t::close () |
216 | { |
217 | // Check the validity of the message. |
218 | if (unlikely (!check ())) { |
219 | errno = EFAULT; |
220 | return -1; |
221 | } |
222 | |
223 | if (_u.base.type == type_lmsg) { |
224 | // If the content is not shared, or if it is shared and the reference |
225 | // count has dropped to zero, deallocate it. |
226 | if (!(_u.lmsg.flags & msg_t::shared) |
227 | || !_u.lmsg.content->refcnt.sub (1)) { |
228 | // We used "placement new" operator to initialize the reference |
229 | // counter so we call the destructor explicitly now. |
230 | _u.lmsg.content->refcnt.~atomic_counter_t (); |
231 | |
232 | if (_u.lmsg.content->ffn) |
233 | _u.lmsg.content->ffn (_u.lmsg.content->data, |
234 | _u.lmsg.content->hint); |
235 | free (_u.lmsg.content); |
236 | } |
237 | } |
238 | |
239 | if (is_zcmsg ()) { |
240 | zmq_assert (_u.zclmsg.content->ffn); |
241 | |
242 | // If the content is not shared, or if it is shared and the reference |
243 | // count has dropped to zero, deallocate it. |
244 | if (!(_u.zclmsg.flags & msg_t::shared) |
245 | || !_u.zclmsg.content->refcnt.sub (1)) { |
246 | // We used "placement new" operator to initialize the reference |
247 | // counter so we call the destructor explicitly now. |
248 | _u.zclmsg.content->refcnt.~atomic_counter_t (); |
249 | |
250 | _u.zclmsg.content->ffn (_u.zclmsg.content->data, |
251 | _u.zclmsg.content->hint); |
252 | } |
253 | } |
254 | |
255 | if (_u.base.metadata != NULL) { |
256 | if (_u.base.metadata->drop_ref ()) { |
257 | LIBZMQ_DELETE (_u.base.metadata); |
258 | } |
259 | _u.base.metadata = NULL; |
260 | } |
261 | |
262 | // Make the message invalid. |
263 | _u.base.type = 0; |
264 | |
265 | return 0; |
266 | } |
267 | |
268 | int zmq::msg_t::move (msg_t &src_) |
269 | { |
270 | // Check the validity of the source. |
271 | if (unlikely (!src_.check ())) { |
272 | errno = EFAULT; |
273 | return -1; |
274 | } |
275 | |
276 | int rc = close (); |
277 | if (unlikely (rc < 0)) |
278 | return rc; |
279 | |
280 | *this = src_; |
281 | |
282 | rc = src_.init (); |
283 | if (unlikely (rc < 0)) |
284 | return rc; |
285 | |
286 | return 0; |
287 | } |
288 | |
289 | int zmq::msg_t::copy (msg_t &src_) |
290 | { |
291 | // Check the validity of the source. |
292 | if (unlikely (!src_.check ())) { |
293 | errno = EFAULT; |
294 | return -1; |
295 | } |
296 | |
297 | int rc = close (); |
298 | if (unlikely (rc < 0)) |
299 | return rc; |
300 | |
301 | // The initial reference count, when a non-shared message is initially |
302 | // shared (between the original and the copy we create here). |
303 | const atomic_counter_t::integer_t initial_shared_refcnt = 2; |
304 | |
305 | if (src_.is_lmsg () || src_.is_zcmsg ()) { |
306 | // One reference is added to shared messages. Non-shared messages |
307 | // are turned into shared messages. |
308 | if (src_.flags () & msg_t::shared) |
309 | src_.refcnt ()->add (1); |
310 | else { |
311 | src_.set_flags (msg_t::shared); |
312 | src_.refcnt ()->set (initial_shared_refcnt); |
313 | } |
314 | } |
315 | |
316 | if (src_._u.base.metadata != NULL) |
317 | src_._u.base.metadata->add_ref (); |
318 | |
319 | *this = src_; |
320 | |
321 | return 0; |
322 | } |
323 | |
324 | void *zmq::msg_t::data () |
325 | { |
326 | // Check the validity of the message. |
327 | zmq_assert (check ()); |
328 | |
329 | switch (_u.base.type) { |
330 | case type_vsm: |
331 | return _u.vsm.data; |
332 | case type_lmsg: |
333 | return _u.lmsg.content->data; |
334 | case type_cmsg: |
335 | return _u.cmsg.data; |
336 | case type_zclmsg: |
337 | return _u.zclmsg.content->data; |
338 | default: |
339 | zmq_assert (false); |
340 | return NULL; |
341 | } |
342 | } |
343 | |
344 | size_t zmq::msg_t::size () const |
345 | { |
346 | // Check the validity of the message. |
347 | zmq_assert (check ()); |
348 | |
349 | switch (_u.base.type) { |
350 | case type_vsm: |
351 | return _u.vsm.size; |
352 | case type_lmsg: |
353 | return _u.lmsg.content->size; |
354 | case type_zclmsg: |
355 | return _u.zclmsg.content->size; |
356 | case type_cmsg: |
357 | return _u.cmsg.size; |
358 | default: |
359 | zmq_assert (false); |
360 | return 0; |
361 | } |
362 | } |
363 | |
364 | unsigned char zmq::msg_t::flags () const |
365 | { |
366 | return _u.base.flags; |
367 | } |
368 | |
369 | void zmq::msg_t::set_flags (unsigned char flags_) |
370 | { |
371 | _u.base.flags |= flags_; |
372 | } |
373 | |
374 | void zmq::msg_t::reset_flags (unsigned char flags_) |
375 | { |
376 | _u.base.flags &= ~flags_; |
377 | } |
378 | |
379 | zmq::metadata_t *zmq::msg_t::metadata () const |
380 | { |
381 | return _u.base.metadata; |
382 | } |
383 | |
384 | void zmq::msg_t::set_metadata (zmq::metadata_t *metadata_) |
385 | { |
386 | assert (metadata_ != NULL); |
387 | assert (_u.base.metadata == NULL); |
388 | metadata_->add_ref (); |
389 | _u.base.metadata = metadata_; |
390 | } |
391 | |
392 | void zmq::msg_t::reset_metadata () |
393 | { |
394 | if (_u.base.metadata) { |
395 | if (_u.base.metadata->drop_ref ()) { |
396 | LIBZMQ_DELETE (_u.base.metadata); |
397 | } |
398 | _u.base.metadata = NULL; |
399 | } |
400 | } |
401 | |
402 | bool zmq::msg_t::is_routing_id () const |
403 | { |
404 | return (_u.base.flags & routing_id) == routing_id; |
405 | } |
406 | |
407 | bool zmq::msg_t::is_credential () const |
408 | { |
409 | return (_u.base.flags & credential) == credential; |
410 | } |
411 | |
412 | bool zmq::msg_t::is_delimiter () const |
413 | { |
414 | return _u.base.type == type_delimiter; |
415 | } |
416 | |
417 | bool zmq::msg_t::is_vsm () const |
418 | { |
419 | return _u.base.type == type_vsm; |
420 | } |
421 | |
422 | bool zmq::msg_t::is_cmsg () const |
423 | { |
424 | return _u.base.type == type_cmsg; |
425 | } |
426 | |
427 | bool zmq::msg_t::is_lmsg () const |
428 | { |
429 | return _u.base.type == type_lmsg; |
430 | } |
431 | |
432 | bool zmq::msg_t::is_zcmsg () const |
433 | { |
434 | return _u.base.type == type_zclmsg; |
435 | } |
436 | |
437 | bool zmq::msg_t::is_join () const |
438 | { |
439 | return _u.base.type == type_join; |
440 | } |
441 | |
442 | bool zmq::msg_t::is_leave () const |
443 | { |
444 | return _u.base.type == type_leave; |
445 | } |
446 | |
447 | bool zmq::msg_t::is_ping () const |
448 | { |
449 | return (_u.base.flags & CMD_TYPE_MASK) == ping; |
450 | } |
451 | |
452 | bool zmq::msg_t::is_pong () const |
453 | { |
454 | return (_u.base.flags & CMD_TYPE_MASK) == pong; |
455 | } |
456 | |
457 | size_t zmq::msg_t::command_body_size () const |
458 | { |
459 | if (this->is_ping () || this->is_pong ()) |
460 | return this->size () - ping_cmd_name_size; |
461 | if (this->is_subscribe ()) |
462 | return this->size () - sub_cmd_name_size; |
463 | if (this->is_cancel ()) |
464 | return this->size () - cancel_cmd_name_size; |
465 | |
466 | return 0; |
467 | } |
468 | |
469 | void *zmq::msg_t::command_body () |
470 | { |
471 | unsigned char *data = NULL; |
472 | if (this->is_ping () || this->is_pong ()) |
473 | data = |
474 | static_cast<unsigned char *> (this->data ()) + ping_cmd_name_size; |
475 | if (this->is_subscribe ()) |
476 | data = static_cast<unsigned char *> (this->data ()) + sub_cmd_name_size; |
477 | if (this->is_cancel ()) |
478 | data = |
479 | static_cast<unsigned char *> (this->data ()) + cancel_cmd_name_size; |
480 | |
481 | return data; |
482 | } |
483 | |
484 | void zmq::msg_t::add_refs (int refs_) |
485 | { |
486 | zmq_assert (refs_ >= 0); |
487 | |
488 | // Operation not supported for messages with metadata. |
489 | zmq_assert (_u.base.metadata == NULL); |
490 | |
491 | // No copies required. |
492 | if (!refs_) |
493 | return; |
494 | |
495 | // VSMs, CMSGS and delimiters can be copied straight away. The only |
496 | // message type that needs special care are long messages. |
497 | if (_u.base.type == type_lmsg || is_zcmsg ()) { |
498 | if (_u.base.flags & msg_t::shared) |
499 | refcnt ()->add (refs_); |
500 | else { |
501 | refcnt ()->set (refs_ + 1); |
502 | _u.base.flags |= msg_t::shared; |
503 | } |
504 | } |
505 | } |
506 | |
507 | bool zmq::msg_t::rm_refs (int refs_) |
508 | { |
509 | zmq_assert (refs_ >= 0); |
510 | |
511 | // Operation not supported for messages with metadata. |
512 | zmq_assert (_u.base.metadata == NULL); |
513 | |
514 | // No copies required. |
515 | if (!refs_) |
516 | return true; |
517 | |
518 | // If there's only one reference close the message. |
519 | if ((_u.base.type != type_zclmsg && _u.base.type != type_lmsg) |
520 | || !(_u.base.flags & msg_t::shared)) { |
521 | close (); |
522 | return false; |
523 | } |
524 | |
525 | // The only message type that needs special care are long and zcopy messages. |
526 | if (_u.base.type == type_lmsg && !_u.lmsg.content->refcnt.sub (refs_)) { |
527 | // We used "placement new" operator to initialize the reference |
528 | // counter so we call the destructor explicitly now. |
529 | _u.lmsg.content->refcnt.~atomic_counter_t (); |
530 | |
531 | if (_u.lmsg.content->ffn) |
532 | _u.lmsg.content->ffn (_u.lmsg.content->data, _u.lmsg.content->hint); |
533 | free (_u.lmsg.content); |
534 | |
535 | return false; |
536 | } |
537 | |
538 | if (is_zcmsg () && !_u.zclmsg.content->refcnt.sub (refs_)) { |
539 | // storage for rfcnt is provided externally |
540 | if (_u.zclmsg.content->ffn) { |
541 | _u.zclmsg.content->ffn (_u.zclmsg.content->data, |
542 | _u.zclmsg.content->hint); |
543 | } |
544 | |
545 | return false; |
546 | } |
547 | |
548 | return true; |
549 | } |
550 | |
551 | uint32_t zmq::msg_t::get_routing_id () |
552 | { |
553 | return _u.base.routing_id; |
554 | } |
555 | |
556 | int zmq::msg_t::set_routing_id (uint32_t routing_id_) |
557 | { |
558 | if (routing_id_) { |
559 | _u.base.routing_id = routing_id_; |
560 | return 0; |
561 | } |
562 | errno = EINVAL; |
563 | return -1; |
564 | } |
565 | |
566 | int zmq::msg_t::reset_routing_id () |
567 | { |
568 | _u.base.routing_id = 0; |
569 | return 0; |
570 | } |
571 | |
572 | const char *zmq::msg_t::group () |
573 | { |
574 | return _u.base.group; |
575 | } |
576 | |
577 | int zmq::msg_t::set_group (const char *group_) |
578 | { |
579 | return set_group (group_, ZMQ_GROUP_MAX_LENGTH); |
580 | } |
581 | |
582 | int zmq::msg_t::set_group (const char *group_, size_t length_) |
583 | { |
584 | if (length_ > ZMQ_GROUP_MAX_LENGTH) { |
585 | errno = EINVAL; |
586 | return -1; |
587 | } |
588 | |
589 | strncpy (_u.base.group, group_, length_); |
590 | _u.base.group[length_] = '\0'; |
591 | |
592 | return 0; |
593 | } |
594 | |
595 | zmq::atomic_counter_t *zmq::msg_t::refcnt () |
596 | { |
597 | switch (_u.base.type) { |
598 | case type_lmsg: |
599 | return &_u.lmsg.content->refcnt; |
600 | case type_zclmsg: |
601 | return &_u.zclmsg.content->refcnt; |
602 | default: |
603 | zmq_assert (false); |
604 | return NULL; |
605 | } |
606 | } |
607 | |