1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#ifndef __ZMQ_MSG_HPP_INCLUDE__
31#define __ZMQ_MSG_HPP_INCLUDE__
32
33#include <stddef.h>
34#include <stdio.h>
35
36#include "config.hpp"
37#include "err.hpp"
38#include "fd.hpp"
39#include "atomic_counter.hpp"
40#include "metadata.hpp"
41
42// bits 2-5
43#define CMD_TYPE_MASK 0x1c
44
45// Signature for free function to deallocate the message content.
46// Note that it has to be declared as "C" so that it is the same as
47// zmq_free_fn defined in zmq.h.
48extern "C" {
49typedef void(msg_free_fn) (void *data_, void *hint_);
50}
51
52namespace zmq
53{
54// Note that this structure needs to be explicitly constructed
55// (init functions) and destructed (close function).
56
57class msg_t
58{
59 public:
60 // Shared message buffer. Message data are either allocated in one
61 // continuous block along with this structure - thus avoiding one
62 // malloc/free pair or they are stored in user-supplied memory.
63 // In the latter case, ffn member stores pointer to the function to be
64 // used to deallocate the data. If the buffer is actually shared (there
65 // are at least 2 references to it) refcount member contains number of
66 // references.
67 struct content_t
68 {
69 void *data;
70 size_t size;
71 msg_free_fn *ffn;
72 void *hint;
73 zmq::atomic_counter_t refcnt;
74 };
75
76 // Message flags.
77 enum
78 {
79 more = 1, // Followed by more parts
80 command = 2, // Command frame (see ZMTP spec)
81 // Command types, use only bits 2-5 and compare with ==, not bitwise,
82 // a command can never be of more that one type at the same time
83 ping = 4,
84 pong = 8,
85 subscribe = 12,
86 cancel = 16,
87 credential = 32,
88 routing_id = 64,
89 shared = 128
90 };
91
92 bool check () const;
93 int init ();
94
95 int init (void *data_,
96 size_t size_,
97 msg_free_fn *ffn_,
98 void *hint_,
99 content_t *content_ = NULL);
100
101 int init_size (size_t size_);
102 int init_data (void *data_, size_t size_, msg_free_fn *ffn_, void *hint_);
103 int init_external_storage (content_t *content_,
104 void *data_,
105 size_t size_,
106 msg_free_fn *ffn_,
107 void *hint_);
108 int init_delimiter ();
109 int init_join ();
110 int init_leave ();
111 int close ();
112 int move (msg_t &src_);
113 int copy (msg_t &src_);
114 void *data ();
115 size_t size () const;
116 unsigned char flags () const;
117 void set_flags (unsigned char flags_);
118 void reset_flags (unsigned char flags_);
119 metadata_t *metadata () const;
120 void set_metadata (metadata_t *metadata_);
121 void reset_metadata ();
122 bool is_routing_id () const;
123 bool is_credential () const;
124 bool is_delimiter () const;
125 bool is_join () const;
126 bool is_leave () const;
127 bool is_ping () const;
128 bool is_pong () const;
129
130 // These are called on each message received by the session_base class,
131 // so get them inlined to avoid the overhead of 2 function calls per msg
132 inline bool is_subscribe () const
133 {
134 return (_u.base.flags & CMD_TYPE_MASK) == subscribe;
135 }
136 inline bool is_cancel () const
137 {
138 return (_u.base.flags & CMD_TYPE_MASK) == cancel;
139 }
140
141 size_t command_body_size () const;
142 void *command_body ();
143 bool is_vsm () const;
144 bool is_cmsg () const;
145 bool is_lmsg () const;
146 bool is_zcmsg () const;
147 uint32_t get_routing_id ();
148 int set_routing_id (uint32_t routing_id_);
149 int reset_routing_id ();
150 const char *group ();
151 int set_group (const char *group_);
152 int set_group (const char *, size_t length_);
153
154 // After calling this function you can copy the message in POD-style
155 // refs_ times. No need to call copy.
156 void add_refs (int refs_);
157
158 // Removes references previously added by add_refs. If the number of
159 // references drops to 0, the message is closed and false is returned.
160 bool rm_refs (int refs_);
161
162 // Size in bytes of the largest message that is still copied around
163 // rather than being reference-counted.
164 enum
165 {
166 msg_t_size = 64
167 };
168 enum
169 {
170 max_vsm_size =
171 msg_t_size - (sizeof (metadata_t *) + 3 + 16 + sizeof (uint32_t))
172 };
173 enum
174 {
175 ping_cmd_name_size = 5, // 4PING
176 cancel_cmd_name_size = 7, // 6CANCEL
177 sub_cmd_name_size = 10 // 9SUBSCRIBE
178 };
179
180 private:
181 zmq::atomic_counter_t *refcnt ();
182
183 // Different message types.
184 enum type_t
185 {
186 type_min = 101,
187 // VSM messages store the content in the message itself
188 type_vsm = 101,
189 // LMSG messages store the content in malloc-ed memory
190 type_lmsg = 102,
191 // Delimiter messages are used in envelopes
192 type_delimiter = 103,
193 // CMSG messages point to constant data
194 type_cmsg = 104,
195
196 // zero-copy LMSG message for v2_decoder
197 type_zclmsg = 105,
198
199 // Join message for radio_dish
200 type_join = 106,
201
202 // Leave message for radio_dish
203 type_leave = 107,
204
205 type_max = 107
206 };
207
208 // Note that fields shared between different message types are not
209 // moved to the parent class (msg_t). This way we get tighter packing
210 // of the data. Shared fields can be accessed via 'base' member of
211 // the union.
212 union
213 {
214 struct
215 {
216 metadata_t *metadata;
217 unsigned char
218 unused[msg_t_size
219 - (sizeof (metadata_t *) + 2 + 16 + sizeof (uint32_t))];
220 unsigned char type;
221 unsigned char flags;
222 char group[16];
223 uint32_t routing_id;
224 } base;
225 struct
226 {
227 metadata_t *metadata;
228 unsigned char data[max_vsm_size];
229 unsigned char size;
230 unsigned char type;
231 unsigned char flags;
232 char group[16];
233 uint32_t routing_id;
234 } vsm;
235 struct
236 {
237 metadata_t *metadata;
238 content_t *content;
239 unsigned char unused[msg_t_size
240 - (sizeof (metadata_t *) + sizeof (content_t *)
241 + 2 + 16 + sizeof (uint32_t))];
242 unsigned char type;
243 unsigned char flags;
244 char group[16];
245 uint32_t routing_id;
246 } lmsg;
247 struct
248 {
249 metadata_t *metadata;
250 content_t *content;
251 unsigned char unused[msg_t_size
252 - (sizeof (metadata_t *) + sizeof (content_t *)
253 + 2 + 16 + sizeof (uint32_t))];
254 unsigned char type;
255 unsigned char flags;
256 char group[16];
257 uint32_t routing_id;
258 } zclmsg;
259 struct
260 {
261 metadata_t *metadata;
262 void *data;
263 size_t size;
264 unsigned char
265 unused[msg_t_size
266 - (sizeof (metadata_t *) + sizeof (void *)
267 + sizeof (size_t) + 2 + 16 + sizeof (uint32_t))];
268 unsigned char type;
269 unsigned char flags;
270 char group[16];
271 uint32_t routing_id;
272 } cmsg;
273 struct
274 {
275 metadata_t *metadata;
276 unsigned char
277 unused[msg_t_size
278 - (sizeof (metadata_t *) + 2 + 16 + sizeof (uint32_t))];
279 unsigned char type;
280 unsigned char flags;
281 char group[16];
282 uint32_t routing_id;
283 } delimiter;
284 } _u;
285};
286
287inline int close_and_return (zmq::msg_t *msg_, int echo_)
288{
289 // Since we abort on close failure we preserve errno for success case.
290 int err = errno;
291 const int rc = msg_->close ();
292 errno_assert (rc == 0);
293 errno = err;
294 return echo_;
295}
296
297inline int close_and_return (zmq::msg_t msg_[], int count_, int echo_)
298{
299 for (int i = 0; i < count_; i++)
300 close_and_return (&msg_[i], 0);
301 return echo_;
302}
303}
304
305#endif
306