1 | /* |
2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #ifndef __ZMQ_MSG_HPP_INCLUDE__ |
31 | #define __ZMQ_MSG_HPP_INCLUDE__ |
32 | |
33 | #include <stddef.h> |
34 | #include <stdio.h> |
35 | |
36 | #include "config.hpp" |
37 | #include "err.hpp" |
38 | #include "fd.hpp" |
39 | #include "atomic_counter.hpp" |
40 | #include "metadata.hpp" |
41 | |
42 | // bits 2-5 |
43 | #define CMD_TYPE_MASK 0x1c |
44 | |
45 | // Signature for free function to deallocate the message content. |
46 | // Note that it has to be declared as "C" so that it is the same as |
47 | // zmq_free_fn defined in zmq.h. |
48 | extern "C" { |
49 | typedef void(msg_free_fn) (void *data_, void *hint_); |
50 | } |
51 | |
52 | namespace zmq |
53 | { |
54 | // Note that this structure needs to be explicitly constructed |
55 | // (init functions) and destructed (close function). |
56 | |
57 | class msg_t |
58 | { |
59 | public: |
60 | // Shared message buffer. Message data are either allocated in one |
61 | // continuous block along with this structure - thus avoiding one |
62 | // malloc/free pair or they are stored in user-supplied memory. |
63 | // In the latter case, ffn member stores pointer to the function to be |
64 | // used to deallocate the data. If the buffer is actually shared (there |
65 | // are at least 2 references to it) refcount member contains number of |
66 | // references. |
67 | struct content_t |
68 | { |
69 | void *data; |
70 | size_t size; |
71 | msg_free_fn *ffn; |
72 | void *hint; |
73 | zmq::atomic_counter_t refcnt; |
74 | }; |
75 | |
76 | // Message flags. |
77 | enum |
78 | { |
79 | more = 1, // Followed by more parts |
80 | command = 2, // Command frame (see ZMTP spec) |
81 | // Command types, use only bits 2-5 and compare with ==, not bitwise, |
82 | // a command can never be of more that one type at the same time |
83 | ping = 4, |
84 | pong = 8, |
85 | subscribe = 12, |
86 | cancel = 16, |
87 | credential = 32, |
88 | routing_id = 64, |
89 | shared = 128 |
90 | }; |
91 | |
92 | bool check () const; |
93 | int init (); |
94 | |
95 | int init (void *data_, |
96 | size_t size_, |
97 | msg_free_fn *ffn_, |
98 | void *hint_, |
99 | content_t *content_ = NULL); |
100 | |
101 | int init_size (size_t size_); |
102 | int init_data (void *data_, size_t size_, msg_free_fn *ffn_, void *hint_); |
103 | int init_external_storage (content_t *content_, |
104 | void *data_, |
105 | size_t size_, |
106 | msg_free_fn *ffn_, |
107 | void *hint_); |
108 | int init_delimiter (); |
109 | int init_join (); |
110 | int init_leave (); |
111 | int close (); |
112 | int move (msg_t &src_); |
113 | int copy (msg_t &src_); |
114 | void *data (); |
115 | size_t size () const; |
116 | unsigned char flags () const; |
117 | void set_flags (unsigned char flags_); |
118 | void reset_flags (unsigned char flags_); |
119 | metadata_t *metadata () const; |
120 | void set_metadata (metadata_t *metadata_); |
121 | void reset_metadata (); |
122 | bool is_routing_id () const; |
123 | bool is_credential () const; |
124 | bool is_delimiter () const; |
125 | bool is_join () const; |
126 | bool is_leave () const; |
127 | bool is_ping () const; |
128 | bool is_pong () const; |
129 | |
130 | // These are called on each message received by the session_base class, |
131 | // so get them inlined to avoid the overhead of 2 function calls per msg |
132 | inline bool is_subscribe () const |
133 | { |
134 | return (_u.base.flags & CMD_TYPE_MASK) == subscribe; |
135 | } |
136 | inline bool is_cancel () const |
137 | { |
138 | return (_u.base.flags & CMD_TYPE_MASK) == cancel; |
139 | } |
140 | |
141 | size_t command_body_size () const; |
142 | void *command_body (); |
143 | bool is_vsm () const; |
144 | bool is_cmsg () const; |
145 | bool is_lmsg () const; |
146 | bool is_zcmsg () const; |
147 | uint32_t get_routing_id (); |
148 | int set_routing_id (uint32_t routing_id_); |
149 | int reset_routing_id (); |
150 | const char *group (); |
151 | int set_group (const char *group_); |
152 | int set_group (const char *, size_t length_); |
153 | |
154 | // After calling this function you can copy the message in POD-style |
155 | // refs_ times. No need to call copy. |
156 | void add_refs (int refs_); |
157 | |
158 | // Removes references previously added by add_refs. If the number of |
159 | // references drops to 0, the message is closed and false is returned. |
160 | bool rm_refs (int refs_); |
161 | |
162 | // Size in bytes of the largest message that is still copied around |
163 | // rather than being reference-counted. |
164 | enum |
165 | { |
166 | msg_t_size = 64 |
167 | }; |
168 | enum |
169 | { |
170 | max_vsm_size = |
171 | msg_t_size - (sizeof (metadata_t *) + 3 + 16 + sizeof (uint32_t)) |
172 | }; |
173 | enum |
174 | { |
175 | ping_cmd_name_size = 5, // 4PING |
176 | cancel_cmd_name_size = 7, // 6CANCEL |
177 | sub_cmd_name_size = 10 // 9SUBSCRIBE |
178 | }; |
179 | |
180 | private: |
181 | zmq::atomic_counter_t *refcnt (); |
182 | |
183 | // Different message types. |
184 | enum type_t |
185 | { |
186 | type_min = 101, |
187 | // VSM messages store the content in the message itself |
188 | type_vsm = 101, |
189 | // LMSG messages store the content in malloc-ed memory |
190 | type_lmsg = 102, |
191 | // Delimiter messages are used in envelopes |
192 | type_delimiter = 103, |
193 | // CMSG messages point to constant data |
194 | type_cmsg = 104, |
195 | |
196 | // zero-copy LMSG message for v2_decoder |
197 | type_zclmsg = 105, |
198 | |
199 | // Join message for radio_dish |
200 | type_join = 106, |
201 | |
202 | // Leave message for radio_dish |
203 | type_leave = 107, |
204 | |
205 | type_max = 107 |
206 | }; |
207 | |
208 | // Note that fields shared between different message types are not |
209 | // moved to the parent class (msg_t). This way we get tighter packing |
210 | // of the data. Shared fields can be accessed via 'base' member of |
211 | // the union. |
212 | union |
213 | { |
214 | struct |
215 | { |
216 | metadata_t *metadata; |
217 | unsigned char |
218 | unused[msg_t_size |
219 | - (sizeof (metadata_t *) + 2 + 16 + sizeof (uint32_t))]; |
220 | unsigned char type; |
221 | unsigned char flags; |
222 | char group[16]; |
223 | uint32_t routing_id; |
224 | } base; |
225 | struct |
226 | { |
227 | metadata_t *metadata; |
228 | unsigned char data[max_vsm_size]; |
229 | unsigned char size; |
230 | unsigned char type; |
231 | unsigned char flags; |
232 | char group[16]; |
233 | uint32_t routing_id; |
234 | } vsm; |
235 | struct |
236 | { |
237 | metadata_t *metadata; |
238 | content_t *content; |
239 | unsigned char unused[msg_t_size |
240 | - (sizeof (metadata_t *) + sizeof (content_t *) |
241 | + 2 + 16 + sizeof (uint32_t))]; |
242 | unsigned char type; |
243 | unsigned char flags; |
244 | char group[16]; |
245 | uint32_t routing_id; |
246 | } lmsg; |
247 | struct |
248 | { |
249 | metadata_t *metadata; |
250 | content_t *content; |
251 | unsigned char unused[msg_t_size |
252 | - (sizeof (metadata_t *) + sizeof (content_t *) |
253 | + 2 + 16 + sizeof (uint32_t))]; |
254 | unsigned char type; |
255 | unsigned char flags; |
256 | char group[16]; |
257 | uint32_t routing_id; |
258 | } zclmsg; |
259 | struct |
260 | { |
261 | metadata_t *metadata; |
262 | void *data; |
263 | size_t size; |
264 | unsigned char |
265 | unused[msg_t_size |
266 | - (sizeof (metadata_t *) + sizeof (void *) |
267 | + sizeof (size_t) + 2 + 16 + sizeof (uint32_t))]; |
268 | unsigned char type; |
269 | unsigned char flags; |
270 | char group[16]; |
271 | uint32_t routing_id; |
272 | } cmsg; |
273 | struct |
274 | { |
275 | metadata_t *metadata; |
276 | unsigned char |
277 | unused[msg_t_size |
278 | - (sizeof (metadata_t *) + 2 + 16 + sizeof (uint32_t))]; |
279 | unsigned char type; |
280 | unsigned char flags; |
281 | char group[16]; |
282 | uint32_t routing_id; |
283 | } delimiter; |
284 | } _u; |
285 | }; |
286 | |
287 | inline int close_and_return (zmq::msg_t *msg_, int echo_) |
288 | { |
289 | // Since we abort on close failure we preserve errno for success case. |
290 | int err = errno; |
291 | const int rc = msg_->close (); |
292 | errno_assert (rc == 0); |
293 | errno = err; |
294 | return echo_; |
295 | } |
296 | |
297 | inline int close_and_return (zmq::msg_t msg_[], int count_, int echo_) |
298 | { |
299 | for (int i = 0; i < count_; i++) |
300 | close_and_return (&msg_[i], 0); |
301 | return echo_; |
302 | } |
303 | } |
304 | |
305 | #endif |
306 | |