1/*
2 Copyright (c) 2007-2019 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include <stdlib.h>
32#include <string.h>
33#include <cmath>
34
35#include "ws_protocol.hpp"
36#include "ws_decoder.hpp"
37#include "likely.hpp"
38#include "wire.hpp"
39#include "err.hpp"
40
41zmq::ws_decoder_t::ws_decoder_t (size_t bufsize_,
42 int64_t maxmsgsize_,
43 bool zero_copy_,
44 bool must_mask_) :
45 decoder_base_t<ws_decoder_t, shared_message_memory_allocator> (bufsize_),
46 _msg_flags (0),
47 _zero_copy (zero_copy_),
48 _max_msg_size (maxmsgsize_),
49 _must_mask (must_mask_),
50 _size (0)
51{
52 memset (_tmpbuf, 0, sizeof (_tmpbuf));
53 int rc = _in_progress.init ();
54 errno_assert (rc == 0);
55
56 // At the beginning, read one byte and go to opcode_ready state.
57 next_step (_tmpbuf, 1, &ws_decoder_t::opcode_ready);
58}
59
60zmq::ws_decoder_t::~ws_decoder_t ()
61{
62 int rc = _in_progress.close ();
63 errno_assert (rc == 0);
64}
65
66int zmq::ws_decoder_t::opcode_ready (unsigned char const *)
67{
68 bool final = (_tmpbuf[0] & 0x80) != 0; // final bit
69 if (!final)
70 return -1; // non final messages are not supported
71
72 _opcode = static_cast<zmq::ws_protocol_t::opcode_t> (_tmpbuf[0] & 0xF);
73
74 _msg_flags = 0;
75
76 switch (_opcode) {
77 case zmq::ws_protocol_t::opcode_binary:
78 break;
79 case zmq::ws_protocol_t::opcode_close:
80 _msg_flags = msg_t::command; // TODO: set the command name to CLOSE
81 break;
82 case zmq::ws_protocol_t::opcode_ping:
83 _msg_flags = msg_t::ping;
84 break;
85 case zmq::ws_protocol_t::opcode_pong:
86 _msg_flags = msg_t::pong;
87 break;
88 default:
89 return -1;
90 }
91
92 next_step (_tmpbuf, 1, &ws_decoder_t::size_first_byte_ready);
93
94 return 0;
95}
96
97int zmq::ws_decoder_t::size_first_byte_ready (unsigned char const *read_from_)
98{
99 bool is_masked = (_tmpbuf[0] & 0x80) != 0;
100
101 if (is_masked != _must_mask) // wrong mask value
102 return -1;
103
104 _size = static_cast<uint64_t> (_tmpbuf[0] & 0x7F);
105
106 if (_size < 126) {
107 if (_must_mask)
108 next_step (_tmpbuf, 4, &ws_decoder_t::mask_ready);
109 else if (_opcode == ws_protocol_t::opcode_binary) {
110 if (_size == 0)
111 return -1;
112 next_step (_tmpbuf, 1, &ws_decoder_t::flags_ready);
113 } else
114 return size_ready (read_from_);
115 } else if (_size == 126)
116 next_step (_tmpbuf, 2, &ws_decoder_t::short_size_ready);
117 else
118 next_step (_tmpbuf, 8, &ws_decoder_t::long_size_ready);
119
120 return 0;
121}
122
123
124int zmq::ws_decoder_t::short_size_ready (unsigned char const *read_from_)
125{
126 _size = (_tmpbuf[0] << 8) | _tmpbuf[1];
127
128 if (_must_mask)
129 next_step (_tmpbuf, 4, &ws_decoder_t::mask_ready);
130 else if (_opcode == ws_protocol_t::opcode_binary) {
131 if (_size == 0)
132 return -1;
133 next_step (_tmpbuf, 1, &ws_decoder_t::flags_ready);
134 } else
135 return size_ready (read_from_);
136
137 return 0;
138}
139
140int zmq::ws_decoder_t::long_size_ready (unsigned char const *read_from_)
141{
142 // The payload size is encoded as 64-bit unsigned integer.
143 // The most significant byte comes first.
144 _size = get_uint64 (_tmpbuf);
145
146 if (_must_mask)
147 next_step (_tmpbuf, 4, &ws_decoder_t::mask_ready);
148 else if (_opcode == ws_protocol_t::opcode_binary) {
149 if (_size == 0)
150 return -1;
151 next_step (_tmpbuf, 1, &ws_decoder_t::flags_ready);
152 } else
153 return size_ready (read_from_);
154
155 return 0;
156}
157
158int zmq::ws_decoder_t::mask_ready (unsigned char const *read_from_)
159{
160 memcpy (_mask, _tmpbuf, 4);
161
162 if (_opcode == ws_protocol_t::opcode_binary) {
163 if (_size == 0)
164 return -1;
165
166 next_step (_tmpbuf, 1, &ws_decoder_t::flags_ready);
167 } else
168 return size_ready (read_from_);
169
170 return 0;
171}
172
173int zmq::ws_decoder_t::flags_ready (unsigned char const *read_from_)
174{
175 unsigned char flags;
176
177 if (_must_mask)
178 flags = _tmpbuf[0] ^ _mask[0];
179 else
180 flags = _tmpbuf[0];
181
182 if (flags & ws_protocol_t::more_flag)
183 _msg_flags |= msg_t::more;
184 if (flags & ws_protocol_t::command_flag)
185 _msg_flags |= msg_t::command;
186
187 _size--;
188
189 return size_ready (read_from_);
190}
191
192
193int zmq::ws_decoder_t::size_ready (unsigned char const *read_pos_)
194{
195 // Message size must not exceed the maximum allowed size.
196 if (_max_msg_size >= 0)
197 if (unlikely (_size > static_cast<uint64_t> (_max_msg_size))) {
198 errno = EMSGSIZE;
199 return -1;
200 }
201
202 // Message size must fit into size_t data type.
203 if (unlikely (_size != static_cast<size_t> (_size))) {
204 errno = EMSGSIZE;
205 return -1;
206 }
207
208 int rc = _in_progress.close ();
209 assert (rc == 0);
210
211 // the current message can exceed the current buffer. We have to copy the buffer
212 // data into a new message and complete it in the next receive.
213
214 shared_message_memory_allocator &allocator = get_allocator ();
215 if (unlikely (!_zero_copy
216 || _size > (size_t) (allocator.data () + allocator.size ()
217 - read_pos_))) {
218 // a new message has started, but the size would exceed the pre-allocated arena
219 // this happens every time when a message does not fit completely into the buffer
220 rc = _in_progress.init_size (static_cast<size_t> (_size));
221 } else {
222 // construct message using n bytes from the buffer as storage
223 // increase buffer ref count
224 // if the message will be a large message, pass a valid refcnt memory location as well
225 rc = _in_progress.init (
226 const_cast<unsigned char *> (read_pos_), static_cast<size_t> (_size),
227 shared_message_memory_allocator::call_dec_ref, allocator.buffer (),
228 allocator.provide_content ());
229
230 // For small messages, data has been copied and refcount does not have to be increased
231 if (_in_progress.is_zcmsg ()) {
232 allocator.advance_content ();
233 allocator.inc_ref ();
234 }
235 }
236
237 if (unlikely (rc)) {
238 errno_assert (errno == ENOMEM);
239 rc = _in_progress.init ();
240 errno_assert (rc == 0);
241 errno = ENOMEM;
242 return -1;
243 }
244
245 _in_progress.set_flags (_msg_flags);
246 // this sets read_pos to
247 // the message data address if the data needs to be copied
248 // for small message / messages exceeding the current buffer
249 // or
250 // to the current start address in the buffer because the message
251 // was constructed to use n bytes from the address passed as argument
252 next_step (_in_progress.data (), _in_progress.size (),
253 &ws_decoder_t::message_ready);
254
255 return 0;
256}
257
258int zmq::ws_decoder_t::message_ready (unsigned char const *)
259{
260 if (_must_mask) {
261 int mask_index = _opcode == ws_protocol_t::opcode_binary ? 1 : 0;
262
263 unsigned char *data =
264 static_cast<unsigned char *> (_in_progress.data ());
265 for (size_t i = 0; i < _size; ++i, mask_index++)
266 data[i] = data[i] ^ _mask[mask_index % 4];
267 }
268
269 // Message is completely read. Signal this to the caller
270 // and prepare to decode next message.
271 next_step (_tmpbuf, 1, &ws_decoder_t::opcode_ready);
272 return 1;
273}
274