1 | /* |
2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #ifndef __ZMQ_PIPE_HPP_INCLUDED__ |
31 | #define __ZMQ_PIPE_HPP_INCLUDED__ |
32 | |
33 | #include "ypipe_base.hpp" |
34 | #include "config.hpp" |
35 | #include "object.hpp" |
36 | #include "stdint.hpp" |
37 | #include "array.hpp" |
38 | #include "blob.hpp" |
39 | #include "options.hpp" |
40 | #include "endpoint.hpp" |
41 | |
42 | namespace zmq |
43 | { |
44 | class msg_t; |
45 | class pipe_t; |
46 | |
47 | // Create a pipepair for bi-directional transfer of messages. |
48 | // First HWM is for messages passed from first pipe to the second pipe. |
49 | // Second HWM is for messages passed from second pipe to the first pipe. |
50 | // Delay specifies how the pipe behaves when the peer terminates. If true |
51 | // pipe receives all the pending messages before terminating, otherwise it |
52 | // terminates straight away. |
53 | // If conflate is true, only the most recently arrived message could be |
54 | // read (older messages are discarded) |
55 | int pipepair (zmq::object_t *parents_[2], |
56 | zmq::pipe_t *pipes_[2], |
57 | int hwms_[2], |
58 | bool conflate_[2]); |
59 | |
60 | struct i_pipe_events |
61 | { |
62 | virtual ~i_pipe_events () ZMQ_DEFAULT; |
63 | |
64 | virtual void read_activated (zmq::pipe_t *pipe_) = 0; |
65 | virtual void write_activated (zmq::pipe_t *pipe_) = 0; |
66 | virtual void hiccuped (zmq::pipe_t *pipe_) = 0; |
67 | virtual void pipe_terminated (zmq::pipe_t *pipe_) = 0; |
68 | }; |
69 | |
70 | // Note that pipe can be stored in three different arrays. |
71 | // The array of inbound pipes (1), the array of outbound pipes (2) and |
72 | // the generic array of pipes to be deallocated (3). |
73 | |
74 | class pipe_t : public object_t, |
75 | public array_item_t<1>, |
76 | public array_item_t<2>, |
77 | public array_item_t<3> |
78 | { |
79 | // This allows pipepair to create pipe objects. |
80 | friend int pipepair (zmq::object_t *parents_[2], |
81 | zmq::pipe_t *pipes_[2], |
82 | int hwms_[2], |
83 | bool conflate_[2]); |
84 | |
85 | public: |
86 | // Specifies the object to send events to. |
87 | void set_event_sink (i_pipe_events *sink_); |
88 | |
89 | // Pipe endpoint can store an routing ID to be used by its clients. |
90 | void set_server_socket_routing_id (uint32_t server_socket_routing_id_); |
91 | uint32_t get_server_socket_routing_id () const; |
92 | |
93 | // Pipe endpoint can store an opaque ID to be used by its clients. |
94 | void set_router_socket_routing_id (const blob_t &router_socket_routing_id_); |
95 | const blob_t &get_routing_id () const; |
96 | |
97 | // Returns true if there is at least one message to read in the pipe. |
98 | bool check_read (); |
99 | |
100 | // Reads a message to the underlying pipe. |
101 | bool read (msg_t *msg_); |
102 | |
103 | // Checks whether messages can be written to the pipe. If the pipe is |
104 | // closed or if writing the message would cause high watermark the |
105 | // function returns false. |
106 | bool check_write (); |
107 | |
108 | // Writes a message to the underlying pipe. Returns false if the |
109 | // message does not pass check_write. If false, the message object |
110 | // retains ownership of its message buffer. |
111 | bool write (msg_t *msg_); |
112 | |
113 | // Remove unfinished parts of the outbound message from the pipe. |
114 | void rollback () const; |
115 | |
116 | // Flush the messages downstream. |
117 | void flush (); |
118 | |
119 | // Temporarily disconnects the inbound message stream and drops |
120 | // all the messages on the fly. Causes 'hiccuped' event to be generated |
121 | // in the peer. |
122 | void hiccup (); |
123 | |
124 | // Ensure the pipe won't block on receiving pipe_term. |
125 | void set_nodelay (); |
126 | |
127 | // Ask pipe to terminate. The termination will happen asynchronously |
128 | // and user will be notified about actual deallocation by 'terminated' |
129 | // event. If delay is true, the pending messages will be processed |
130 | // before actual shutdown. |
131 | void terminate (bool delay_); |
132 | |
133 | // Set the high water marks. |
134 | void set_hwms (int inhwm_, int outhwm_); |
135 | |
136 | // Set the boost to high water marks, used by inproc sockets so total hwm are sum of connect and bind sockets watermarks |
137 | void set_hwms_boost (int inhwmboost_, int outhwmboost_); |
138 | |
139 | // send command to peer for notify the change of hwm |
140 | void send_hwms_to_peer (int inhwm_, int outhwm_); |
141 | |
142 | // Returns true if HWM is not reached |
143 | bool check_hwm () const; |
144 | |
145 | void set_endpoint_pair (endpoint_uri_pair_t endpoint_pair_); |
146 | const endpoint_uri_pair_t &get_endpoint_pair () const; |
147 | |
148 | void send_stats_to_peer (own_t *socket_base_); |
149 | |
150 | private: |
151 | // Type of the underlying lock-free pipe. |
152 | typedef ypipe_base_t<msg_t> upipe_t; |
153 | |
154 | // Command handlers. |
155 | void process_activate_read (); |
156 | void process_activate_write (uint64_t msgs_read_); |
157 | void process_hiccup (void *pipe_); |
158 | void process_pipe_peer_stats (uint64_t queue_count_, |
159 | own_t *socket_base_, |
160 | endpoint_uri_pair_t *endpoint_pair_); |
161 | void process_pipe_term (); |
162 | void process_pipe_term_ack (); |
163 | void process_pipe_hwm (int inhwm_, int outhwm_); |
164 | |
165 | // Handler for delimiter read from the pipe. |
166 | void process_delimiter (); |
167 | |
168 | // Constructor is private. Pipe can only be created using |
169 | // pipepair function. |
170 | pipe_t (object_t *parent_, |
171 | upipe_t *inpipe_, |
172 | upipe_t *outpipe_, |
173 | int inhwm_, |
174 | int outhwm_, |
175 | bool conflate_); |
176 | |
177 | // Pipepair uses this function to let us know about |
178 | // the peer pipe object. |
179 | void set_peer (pipe_t *peer_); |
180 | |
181 | // Destructor is private. Pipe objects destroy themselves. |
182 | ~pipe_t (); |
183 | |
184 | // Underlying pipes for both directions. |
185 | upipe_t *_in_pipe; |
186 | upipe_t *_out_pipe; |
187 | |
188 | // Can the pipe be read from / written to? |
189 | bool _in_active; |
190 | bool _out_active; |
191 | |
192 | // High watermark for the outbound pipe. |
193 | int _hwm; |
194 | |
195 | // Low watermark for the inbound pipe. |
196 | int _lwm; |
197 | |
198 | // boosts for high and low watermarks, used with inproc sockets so hwm are sum of send and recv hmws on each side of pipe |
199 | int _in_hwm_boost; |
200 | int _out_hwm_boost; |
201 | |
202 | // Number of messages read and written so far. |
203 | uint64_t _msgs_read; |
204 | uint64_t _msgs_written; |
205 | |
206 | // Last received peer's msgs_read. The actual number in the peer |
207 | // can be higher at the moment. |
208 | uint64_t _peers_msgs_read; |
209 | |
210 | // The pipe object on the other side of the pipepair. |
211 | pipe_t *_peer; |
212 | |
213 | // Sink to send events to. |
214 | i_pipe_events *_sink; |
215 | |
216 | // States of the pipe endpoint: |
217 | // active: common state before any termination begins, |
218 | // delimiter_received: delimiter was read from pipe before |
219 | // term command was received, |
220 | // waiting_for_delimiter: term command was already received |
221 | // from the peer but there are still pending messages to read, |
222 | // term_ack_sent: all pending messages were already read and |
223 | // all we are waiting for is ack from the peer, |
224 | // term_req_sent1: 'terminate' was explicitly called by the user, |
225 | // term_req_sent2: user called 'terminate' and then we've got |
226 | // term command from the peer as well. |
227 | enum |
228 | { |
229 | active, |
230 | delimiter_received, |
231 | waiting_for_delimiter, |
232 | term_ack_sent, |
233 | term_req_sent1, |
234 | term_req_sent2 |
235 | } _state; |
236 | |
237 | // If true, we receive all the pending inbound messages before |
238 | // terminating. If false, we terminate immediately when the peer |
239 | // asks us to. |
240 | bool _delay; |
241 | |
242 | // Routing id of the writer. Used uniquely by the reader side. |
243 | blob_t _router_socket_routing_id; |
244 | |
245 | // Routing id of the writer. Used uniquely by the reader side. |
246 | int _server_socket_routing_id; |
247 | |
248 | // Returns true if the message is delimiter; false otherwise. |
249 | static bool is_delimiter (const msg_t &msg_); |
250 | |
251 | // Computes appropriate low watermark from the given high watermark. |
252 | static int compute_lwm (int hwm_); |
253 | |
254 | const bool _conflate; |
255 | |
256 | // The endpoints of this pipe. |
257 | endpoint_uri_pair_t _endpoint_pair; |
258 | |
259 | ZMQ_NON_COPYABLE_NOR_MOVABLE (pipe_t) |
260 | }; |
261 | |
262 | void send_routing_id (pipe_t *pipe_, const options_t &options_); |
263 | } |
264 | |
265 | #endif |
266 | |