1 | /* |
2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #ifndef __ZMQ_DECODER_HPP_INCLUDED__ |
31 | #define __ZMQ_DECODER_HPP_INCLUDED__ |
32 | |
33 | #include <algorithm> |
34 | #include <cstddef> |
35 | #include <cstring> |
36 | |
37 | #include "decoder_allocators.hpp" |
38 | #include "err.hpp" |
39 | #include "i_decoder.hpp" |
40 | #include "stdint.hpp" |
41 | |
42 | namespace zmq |
43 | { |
44 | // Helper base class for decoders that know the amount of data to read |
45 | // in advance at any moment. Knowing the amount in advance is a property |
46 | // of the protocol used. 0MQ framing protocol is based size-prefixed |
47 | // paradigm, which qualifies it to be parsed by this class. |
48 | // On the other hand, XML-based transports (like XMPP or SOAP) don't allow |
49 | // for knowing the size of data to read in advance and should use different |
50 | // decoding algorithms. |
51 | // |
52 | // This class implements the state machine that parses the incoming buffer. |
53 | // Derived class should implement individual state machine actions. |
54 | // |
55 | // Buffer management is done by an allocator policy. |
56 | template <typename T, typename A = c_single_allocator> |
57 | class decoder_base_t : public i_decoder |
58 | { |
59 | public: |
60 | explicit decoder_base_t (const size_t buf_size_) : |
61 | _next (NULL), |
62 | _read_pos (NULL), |
63 | _to_read (0), |
64 | _allocator (buf_size_) |
65 | { |
66 | _buf = _allocator.allocate (); |
67 | } |
68 | |
69 | // The destructor doesn't have to be virtual. It is made virtual |
70 | // just to keep ICC and code checking tools from complaining. |
71 | virtual ~decoder_base_t () { _allocator.deallocate (); } |
72 | |
73 | // Returns a buffer to be filled with binary data. |
74 | void get_buffer (unsigned char **data_, std::size_t *size_) |
75 | { |
76 | _buf = _allocator.allocate (); |
77 | |
78 | // If we are expected to read large message, we'll opt for zero- |
79 | // copy, i.e. we'll ask caller to fill the data directly to the |
80 | // message. Note that subsequent read(s) are non-blocking, thus |
81 | // each single read reads at most SO_RCVBUF bytes at once not |
82 | // depending on how large is the chunk returned from here. |
83 | // As a consequence, large messages being received won't block |
84 | // other engines running in the same I/O thread for excessive |
85 | // amounts of time. |
86 | if (_to_read >= _allocator.size ()) { |
87 | *data_ = _read_pos; |
88 | *size_ = _to_read; |
89 | return; |
90 | } |
91 | |
92 | *data_ = _buf; |
93 | *size_ = _allocator.size (); |
94 | } |
95 | |
96 | // Processes the data in the buffer previously allocated using |
97 | // get_buffer function. size_ argument specifies number of bytes |
98 | // actually filled into the buffer. Function returns 1 when the |
99 | // whole message was decoded or 0 when more data is required. |
100 | // On error, -1 is returned and errno set accordingly. |
101 | // Number of bytes processed is returned in bytes_used_. |
102 | int decode (const unsigned char *data_, |
103 | std::size_t size_, |
104 | std::size_t &bytes_used_) |
105 | { |
106 | bytes_used_ = 0; |
107 | |
108 | // In case of zero-copy simply adjust the pointers, no copying |
109 | // is required. Also, run the state machine in case all the data |
110 | // were processed. |
111 | if (data_ == _read_pos) { |
112 | zmq_assert (size_ <= _to_read); |
113 | _read_pos += size_; |
114 | _to_read -= size_; |
115 | bytes_used_ = size_; |
116 | |
117 | while (!_to_read) { |
118 | const int rc = |
119 | (static_cast<T *> (this)->*_next) (data_ + bytes_used_); |
120 | if (rc != 0) |
121 | return rc; |
122 | } |
123 | return 0; |
124 | } |
125 | |
126 | while (bytes_used_ < size_) { |
127 | // Copy the data from buffer to the message. |
128 | const size_t to_copy = std::min (_to_read, size_ - bytes_used_); |
129 | // Only copy when destination address is different from the |
130 | // current address in the buffer. |
131 | if (_read_pos != data_ + bytes_used_) { |
132 | memcpy (_read_pos, data_ + bytes_used_, to_copy); |
133 | } |
134 | |
135 | _read_pos += to_copy; |
136 | _to_read -= to_copy; |
137 | bytes_used_ += to_copy; |
138 | // Try to get more space in the message to fill in. |
139 | // If none is available, return. |
140 | while (_to_read == 0) { |
141 | // pass current address in the buffer |
142 | const int rc = |
143 | (static_cast<T *> (this)->*_next) (data_ + bytes_used_); |
144 | if (rc != 0) |
145 | return rc; |
146 | } |
147 | } |
148 | |
149 | return 0; |
150 | } |
151 | |
152 | virtual void resize_buffer (std::size_t new_size_) |
153 | { |
154 | _allocator.resize (new_size_); |
155 | } |
156 | |
157 | protected: |
158 | // Prototype of state machine action. Action should return false if |
159 | // it is unable to push the data to the system. |
160 | typedef int (T::*step_t) (unsigned char const *); |
161 | |
162 | // This function should be called from derived class to read data |
163 | // from the buffer and schedule next state machine action. |
164 | void next_step (void *read_pos_, std::size_t to_read_, step_t next_) |
165 | { |
166 | _read_pos = static_cast<unsigned char *> (read_pos_); |
167 | _to_read = to_read_; |
168 | _next = next_; |
169 | } |
170 | |
171 | A &get_allocator () { return _allocator; } |
172 | |
173 | private: |
174 | // Next step. If set to NULL, it means that associated data stream |
175 | // is dead. Note that there can be still data in the process in such |
176 | // case. |
177 | step_t _next; |
178 | |
179 | // Where to store the read data. |
180 | unsigned char *_read_pos; |
181 | |
182 | // How much data to read before taking next step. |
183 | std::size_t _to_read; |
184 | |
185 | // The duffer for data to decode. |
186 | A _allocator; |
187 | unsigned char *_buf; |
188 | |
189 | ZMQ_NON_COPYABLE_NOR_MOVABLE (decoder_base_t) |
190 | }; |
191 | } |
192 | |
193 | #endif |
194 | |