1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#ifndef __ZMQ_PIPE_HPP_INCLUDED__
31#define __ZMQ_PIPE_HPP_INCLUDED__
32
33#include "ypipe_base.hpp"
34#include "config.hpp"
35#include "object.hpp"
36#include "stdint.hpp"
37#include "array.hpp"
38#include "blob.hpp"
39#include "options.hpp"
40#include "endpoint.hpp"
41
42namespace zmq
43{
44class msg_t;
45class pipe_t;
46
47// Create a pipepair for bi-directional transfer of messages.
48// First HWM is for messages passed from first pipe to the second pipe.
49// Second HWM is for messages passed from second pipe to the first pipe.
50// Delay specifies how the pipe behaves when the peer terminates. If true
51// pipe receives all the pending messages before terminating, otherwise it
52// terminates straight away.
53// If conflate is true, only the most recently arrived message could be
54// read (older messages are discarded)
55int pipepair (zmq::object_t *parents_[2],
56 zmq::pipe_t *pipes_[2],
57 int hwms_[2],
58 bool conflate_[2]);
59
60struct i_pipe_events
61{
62 virtual ~i_pipe_events () ZMQ_DEFAULT;
63
64 virtual void read_activated (zmq::pipe_t *pipe_) = 0;
65 virtual void write_activated (zmq::pipe_t *pipe_) = 0;
66 virtual void hiccuped (zmq::pipe_t *pipe_) = 0;
67 virtual void pipe_terminated (zmq::pipe_t *pipe_) = 0;
68};
69
70// Note that pipe can be stored in three different arrays.
71// The array of inbound pipes (1), the array of outbound pipes (2) and
72// the generic array of pipes to be deallocated (3).
73
74class pipe_t : public object_t,
75 public array_item_t<1>,
76 public array_item_t<2>,
77 public array_item_t<3>
78{
79 // This allows pipepair to create pipe objects.
80 friend int pipepair (zmq::object_t *parents_[2],
81 zmq::pipe_t *pipes_[2],
82 int hwms_[2],
83 bool conflate_[2]);
84
85 public:
86 // Specifies the object to send events to.
87 void set_event_sink (i_pipe_events *sink_);
88
89 // Pipe endpoint can store an routing ID to be used by its clients.
90 void set_server_socket_routing_id (uint32_t server_socket_routing_id_);
91 uint32_t get_server_socket_routing_id () const;
92
93 // Pipe endpoint can store an opaque ID to be used by its clients.
94 void set_router_socket_routing_id (const blob_t &router_socket_routing_id_);
95 const blob_t &get_routing_id () const;
96
97 // Returns true if there is at least one message to read in the pipe.
98 bool check_read ();
99
100 // Reads a message to the underlying pipe.
101 bool read (msg_t *msg_);
102
103 // Checks whether messages can be written to the pipe. If the pipe is
104 // closed or if writing the message would cause high watermark the
105 // function returns false.
106 bool check_write ();
107
108 // Writes a message to the underlying pipe. Returns false if the
109 // message does not pass check_write. If false, the message object
110 // retains ownership of its message buffer.
111 bool write (msg_t *msg_);
112
113 // Remove unfinished parts of the outbound message from the pipe.
114 void rollback () const;
115
116 // Flush the messages downstream.
117 void flush ();
118
119 // Temporarily disconnects the inbound message stream and drops
120 // all the messages on the fly. Causes 'hiccuped' event to be generated
121 // in the peer.
122 void hiccup ();
123
124 // Ensure the pipe won't block on receiving pipe_term.
125 void set_nodelay ();
126
127 // Ask pipe to terminate. The termination will happen asynchronously
128 // and user will be notified about actual deallocation by 'terminated'
129 // event. If delay is true, the pending messages will be processed
130 // before actual shutdown.
131 void terminate (bool delay_);
132
133 // Set the high water marks.
134 void set_hwms (int inhwm_, int outhwm_);
135
136 // Set the boost to high water marks, used by inproc sockets so total hwm are sum of connect and bind sockets watermarks
137 void set_hwms_boost (int inhwmboost_, int outhwmboost_);
138
139 // send command to peer for notify the change of hwm
140 void send_hwms_to_peer (int inhwm_, int outhwm_);
141
142 // Returns true if HWM is not reached
143 bool check_hwm () const;
144
145 void set_endpoint_pair (endpoint_uri_pair_t endpoint_pair_);
146 const endpoint_uri_pair_t &get_endpoint_pair () const;
147
148 void send_stats_to_peer (own_t *socket_base_);
149
150 private:
151 // Type of the underlying lock-free pipe.
152 typedef ypipe_base_t<msg_t> upipe_t;
153
154 // Command handlers.
155 void process_activate_read ();
156 void process_activate_write (uint64_t msgs_read_);
157 void process_hiccup (void *pipe_);
158 void process_pipe_peer_stats (uint64_t queue_count_,
159 own_t *socket_base_,
160 endpoint_uri_pair_t *endpoint_pair_);
161 void process_pipe_term ();
162 void process_pipe_term_ack ();
163 void process_pipe_hwm (int inhwm_, int outhwm_);
164
165 // Handler for delimiter read from the pipe.
166 void process_delimiter ();
167
168 // Constructor is private. Pipe can only be created using
169 // pipepair function.
170 pipe_t (object_t *parent_,
171 upipe_t *inpipe_,
172 upipe_t *outpipe_,
173 int inhwm_,
174 int outhwm_,
175 bool conflate_);
176
177 // Pipepair uses this function to let us know about
178 // the peer pipe object.
179 void set_peer (pipe_t *peer_);
180
181 // Destructor is private. Pipe objects destroy themselves.
182 ~pipe_t ();
183
184 // Underlying pipes for both directions.
185 upipe_t *_in_pipe;
186 upipe_t *_out_pipe;
187
188 // Can the pipe be read from / written to?
189 bool _in_active;
190 bool _out_active;
191
192 // High watermark for the outbound pipe.
193 int _hwm;
194
195 // Low watermark for the inbound pipe.
196 int _lwm;
197
198 // boosts for high and low watermarks, used with inproc sockets so hwm are sum of send and recv hmws on each side of pipe
199 int _in_hwm_boost;
200 int _out_hwm_boost;
201
202 // Number of messages read and written so far.
203 uint64_t _msgs_read;
204 uint64_t _msgs_written;
205
206 // Last received peer's msgs_read. The actual number in the peer
207 // can be higher at the moment.
208 uint64_t _peers_msgs_read;
209
210 // The pipe object on the other side of the pipepair.
211 pipe_t *_peer;
212
213 // Sink to send events to.
214 i_pipe_events *_sink;
215
216 // States of the pipe endpoint:
217 // active: common state before any termination begins,
218 // delimiter_received: delimiter was read from pipe before
219 // term command was received,
220 // waiting_for_delimiter: term command was already received
221 // from the peer but there are still pending messages to read,
222 // term_ack_sent: all pending messages were already read and
223 // all we are waiting for is ack from the peer,
224 // term_req_sent1: 'terminate' was explicitly called by the user,
225 // term_req_sent2: user called 'terminate' and then we've got
226 // term command from the peer as well.
227 enum
228 {
229 active,
230 delimiter_received,
231 waiting_for_delimiter,
232 term_ack_sent,
233 term_req_sent1,
234 term_req_sent2
235 } _state;
236
237 // If true, we receive all the pending inbound messages before
238 // terminating. If false, we terminate immediately when the peer
239 // asks us to.
240 bool _delay;
241
242 // Routing id of the writer. Used uniquely by the reader side.
243 blob_t _router_socket_routing_id;
244
245 // Routing id of the writer. Used uniquely by the reader side.
246 int _server_socket_routing_id;
247
248 // Returns true if the message is delimiter; false otherwise.
249 static bool is_delimiter (const msg_t &msg_);
250
251 // Computes appropriate low watermark from the given high watermark.
252 static int compute_lwm (int hwm_);
253
254 const bool _conflate;
255
256 // The endpoints of this pipe.
257 endpoint_uri_pair_t _endpoint_pair;
258
259 ZMQ_NON_COPYABLE_NOR_MOVABLE (pipe_t)
260};
261
262void send_routing_id (pipe_t *pipe_, const options_t &options_);
263}
264
265#endif
266