1 | /* |
2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #include "precompiled.hpp" |
31 | #include <new> |
32 | #include <stddef.h> |
33 | |
34 | #include "macros.hpp" |
35 | #include "pipe.hpp" |
36 | #include "err.hpp" |
37 | |
38 | #include "ypipe.hpp" |
39 | #include "ypipe_conflate.hpp" |
40 | |
41 | int zmq::pipepair (class object_t *parents_[2], |
42 | class pipe_t *pipes_[2], |
43 | int hwms_[2], |
44 | bool conflate_[2]) |
45 | { |
46 | // Creates two pipe objects. These objects are connected by two ypipes, |
47 | // each to pass messages in one direction. |
48 | |
49 | typedef ypipe_t<msg_t, message_pipe_granularity> upipe_normal_t; |
50 | typedef ypipe_conflate_t<msg_t> upipe_conflate_t; |
51 | |
52 | pipe_t::upipe_t *upipe1; |
53 | if (conflate_[0]) |
54 | upipe1 = new (std::nothrow) upipe_conflate_t (); |
55 | else |
56 | upipe1 = new (std::nothrow) upipe_normal_t (); |
57 | alloc_assert (upipe1); |
58 | |
59 | pipe_t::upipe_t *upipe2; |
60 | if (conflate_[1]) |
61 | upipe2 = new (std::nothrow) upipe_conflate_t (); |
62 | else |
63 | upipe2 = new (std::nothrow) upipe_normal_t (); |
64 | alloc_assert (upipe2); |
65 | |
66 | pipes_[0] = new (std::nothrow) |
67 | pipe_t (parents_[0], upipe1, upipe2, hwms_[1], hwms_[0], conflate_[0]); |
68 | alloc_assert (pipes_[0]); |
69 | pipes_[1] = new (std::nothrow) |
70 | pipe_t (parents_[1], upipe2, upipe1, hwms_[0], hwms_[1], conflate_[1]); |
71 | alloc_assert (pipes_[1]); |
72 | |
73 | pipes_[0]->set_peer (pipes_[1]); |
74 | pipes_[1]->set_peer (pipes_[0]); |
75 | |
76 | return 0; |
77 | } |
78 | |
79 | void zmq::send_routing_id (pipe_t *pipe_, const options_t &options_) |
80 | { |
81 | zmq::msg_t id; |
82 | const int rc = id.init_size (options_.routing_id_size); |
83 | errno_assert (rc == 0); |
84 | memcpy (id.data (), options_.routing_id, options_.routing_id_size); |
85 | id.set_flags (zmq::msg_t::routing_id); |
86 | const bool written = pipe_->write (&id); |
87 | zmq_assert (written); |
88 | pipe_->flush (); |
89 | } |
90 | |
91 | zmq::pipe_t::pipe_t (object_t *parent_, |
92 | upipe_t *inpipe_, |
93 | upipe_t *outpipe_, |
94 | int inhwm_, |
95 | int outhwm_, |
96 | bool conflate_) : |
97 | object_t (parent_), |
98 | _in_pipe (inpipe_), |
99 | _out_pipe (outpipe_), |
100 | _in_active (true), |
101 | _out_active (true), |
102 | _hwm (outhwm_), |
103 | _lwm (compute_lwm (inhwm_)), |
104 | _in_hwm_boost (-1), |
105 | _out_hwm_boost (-1), |
106 | _msgs_read (0), |
107 | _msgs_written (0), |
108 | _peers_msgs_read (0), |
109 | _peer (NULL), |
110 | _sink (NULL), |
111 | _state (active), |
112 | _delay (true), |
113 | _server_socket_routing_id (0), |
114 | _conflate (conflate_) |
115 | { |
116 | } |
117 | |
118 | zmq::pipe_t::~pipe_t () |
119 | { |
120 | } |
121 | |
122 | void zmq::pipe_t::set_peer (pipe_t *peer_) |
123 | { |
124 | // Peer can be set once only. |
125 | zmq_assert (!_peer); |
126 | _peer = peer_; |
127 | } |
128 | |
129 | void zmq::pipe_t::set_event_sink (i_pipe_events *sink_) |
130 | { |
131 | // Sink can be set once only. |
132 | zmq_assert (!_sink); |
133 | _sink = sink_; |
134 | } |
135 | |
136 | void zmq::pipe_t::set_server_socket_routing_id ( |
137 | uint32_t server_socket_routing_id_) |
138 | { |
139 | _server_socket_routing_id = server_socket_routing_id_; |
140 | } |
141 | |
142 | uint32_t zmq::pipe_t::get_server_socket_routing_id () const |
143 | { |
144 | return _server_socket_routing_id; |
145 | } |
146 | |
147 | void zmq::pipe_t::set_router_socket_routing_id ( |
148 | const blob_t &router_socket_routing_id_) |
149 | { |
150 | _router_socket_routing_id.set_deep_copy (router_socket_routing_id_); |
151 | } |
152 | |
153 | const zmq::blob_t &zmq::pipe_t::get_routing_id () const |
154 | { |
155 | return _router_socket_routing_id; |
156 | } |
157 | |
158 | bool zmq::pipe_t::check_read () |
159 | { |
160 | if (unlikely (!_in_active)) |
161 | return false; |
162 | if (unlikely (_state != active && _state != waiting_for_delimiter)) |
163 | return false; |
164 | |
165 | // Check if there's an item in the pipe. |
166 | if (!_in_pipe->check_read ()) { |
167 | _in_active = false; |
168 | return false; |
169 | } |
170 | |
171 | // If the next item in the pipe is message delimiter, |
172 | // initiate termination process. |
173 | if (_in_pipe->probe (is_delimiter)) { |
174 | msg_t msg; |
175 | const bool ok = _in_pipe->read (&msg); |
176 | zmq_assert (ok); |
177 | process_delimiter (); |
178 | return false; |
179 | } |
180 | |
181 | return true; |
182 | } |
183 | |
184 | bool zmq::pipe_t::read (msg_t *msg_) |
185 | { |
186 | if (unlikely (!_in_active)) |
187 | return false; |
188 | if (unlikely (_state != active && _state != waiting_for_delimiter)) |
189 | return false; |
190 | |
191 | for (bool payload_read = false; !payload_read;) { |
192 | if (!_in_pipe->read (msg_)) { |
193 | _in_active = false; |
194 | return false; |
195 | } |
196 | |
197 | // If this is a credential, ignore it and receive next message. |
198 | if (unlikely (msg_->is_credential ())) { |
199 | const int rc = msg_->close (); |
200 | zmq_assert (rc == 0); |
201 | } else |
202 | payload_read = true; |
203 | } |
204 | |
205 | // If delimiter was read, start termination process of the pipe. |
206 | if (msg_->is_delimiter ()) { |
207 | process_delimiter (); |
208 | return false; |
209 | } |
210 | |
211 | if (!(msg_->flags () & msg_t::more) && !msg_->is_routing_id ()) |
212 | _msgs_read++; |
213 | |
214 | if (_lwm > 0 && _msgs_read % _lwm == 0) |
215 | send_activate_write (_peer, _msgs_read); |
216 | |
217 | return true; |
218 | } |
219 | |
220 | bool zmq::pipe_t::check_write () |
221 | { |
222 | if (unlikely (!_out_active || _state != active)) |
223 | return false; |
224 | |
225 | const bool full = !check_hwm (); |
226 | |
227 | if (unlikely (full)) { |
228 | _out_active = false; |
229 | return false; |
230 | } |
231 | |
232 | return true; |
233 | } |
234 | |
235 | bool zmq::pipe_t::write (msg_t *msg_) |
236 | { |
237 | if (unlikely (!check_write ())) |
238 | return false; |
239 | |
240 | const bool more = (msg_->flags () & msg_t::more) != 0; |
241 | const bool is_routing_id = msg_->is_routing_id (); |
242 | _out_pipe->write (*msg_, more); |
243 | if (!more && !is_routing_id) |
244 | _msgs_written++; |
245 | |
246 | return true; |
247 | } |
248 | |
249 | void zmq::pipe_t::rollback () const |
250 | { |
251 | // Remove incomplete message from the outbound pipe. |
252 | msg_t msg; |
253 | if (_out_pipe) { |
254 | while (_out_pipe->unwrite (&msg)) { |
255 | zmq_assert (msg.flags () & msg_t::more); |
256 | const int rc = msg.close (); |
257 | errno_assert (rc == 0); |
258 | } |
259 | } |
260 | } |
261 | |
262 | void zmq::pipe_t::flush () |
263 | { |
264 | // The peer does not exist anymore at this point. |
265 | if (_state == term_ack_sent) |
266 | return; |
267 | |
268 | if (_out_pipe && !_out_pipe->flush ()) |
269 | send_activate_read (_peer); |
270 | } |
271 | |
272 | void zmq::pipe_t::process_activate_read () |
273 | { |
274 | if (!_in_active && (_state == active || _state == waiting_for_delimiter)) { |
275 | _in_active = true; |
276 | _sink->read_activated (this); |
277 | } |
278 | } |
279 | |
280 | void zmq::pipe_t::process_activate_write (uint64_t msgs_read_) |
281 | { |
282 | // Remember the peer's message sequence number. |
283 | _peers_msgs_read = msgs_read_; |
284 | |
285 | if (!_out_active && _state == active) { |
286 | _out_active = true; |
287 | _sink->write_activated (this); |
288 | } |
289 | } |
290 | |
291 | void zmq::pipe_t::process_hiccup (void *pipe_) |
292 | { |
293 | // Destroy old outpipe. Note that the read end of the pipe was already |
294 | // migrated to this thread. |
295 | zmq_assert (_out_pipe); |
296 | _out_pipe->flush (); |
297 | msg_t msg; |
298 | while (_out_pipe->read (&msg)) { |
299 | if (!(msg.flags () & msg_t::more)) |
300 | _msgs_written--; |
301 | const int rc = msg.close (); |
302 | errno_assert (rc == 0); |
303 | } |
304 | LIBZMQ_DELETE (_out_pipe); |
305 | |
306 | // Plug in the new outpipe. |
307 | zmq_assert (pipe_); |
308 | _out_pipe = static_cast<upipe_t *> (pipe_); |
309 | _out_active = true; |
310 | |
311 | // If appropriate, notify the user about the hiccup. |
312 | if (_state == active) |
313 | _sink->hiccuped (this); |
314 | } |
315 | |
316 | void zmq::pipe_t::process_pipe_term () |
317 | { |
318 | zmq_assert (_state == active || _state == delimiter_received |
319 | || _state == term_req_sent1); |
320 | |
321 | // This is the simple case of peer-induced termination. If there are no |
322 | // more pending messages to read, or if the pipe was configured to drop |
323 | // pending messages, we can move directly to the term_ack_sent state. |
324 | // Otherwise we'll hang up in waiting_for_delimiter state till all |
325 | // pending messages are read. |
326 | if (_state == active) { |
327 | if (_delay) |
328 | _state = waiting_for_delimiter; |
329 | else { |
330 | _state = term_ack_sent; |
331 | _out_pipe = NULL; |
332 | send_pipe_term_ack (_peer); |
333 | } |
334 | } |
335 | |
336 | // Delimiter happened to arrive before the term command. Now we have the |
337 | // term command as well, so we can move straight to term_ack_sent state. |
338 | else if (_state == delimiter_received) { |
339 | _state = term_ack_sent; |
340 | _out_pipe = NULL; |
341 | send_pipe_term_ack (_peer); |
342 | } |
343 | |
344 | // This is the case where both ends of the pipe are closed in parallel. |
345 | // We simply reply to the request by ack and continue waiting for our |
346 | // own ack. |
347 | else if (_state == term_req_sent1) { |
348 | _state = term_req_sent2; |
349 | _out_pipe = NULL; |
350 | send_pipe_term_ack (_peer); |
351 | } |
352 | } |
353 | |
354 | void zmq::pipe_t::process_pipe_term_ack () |
355 | { |
356 | // Notify the user that all the references to the pipe should be dropped. |
357 | zmq_assert (_sink); |
358 | _sink->pipe_terminated (this); |
359 | |
360 | // In term_ack_sent and term_req_sent2 states there's nothing to do. |
361 | // Simply deallocate the pipe. In term_req_sent1 state we have to ack |
362 | // the peer before deallocating this side of the pipe. |
363 | // All the other states are invalid. |
364 | if (_state == term_req_sent1) { |
365 | _out_pipe = NULL; |
366 | send_pipe_term_ack (_peer); |
367 | } else |
368 | zmq_assert (_state == term_ack_sent || _state == term_req_sent2); |
369 | |
370 | // We'll deallocate the inbound pipe, the peer will deallocate the outbound |
371 | // pipe (which is an inbound pipe from its point of view). |
372 | // First, delete all the unread messages in the pipe. We have to do it by |
373 | // hand because msg_t doesn't have automatic destructor. Then deallocate |
374 | // the ypipe itself. |
375 | |
376 | if (!_conflate) { |
377 | msg_t msg; |
378 | while (_in_pipe->read (&msg)) { |
379 | const int rc = msg.close (); |
380 | errno_assert (rc == 0); |
381 | } |
382 | } |
383 | |
384 | LIBZMQ_DELETE (_in_pipe); |
385 | |
386 | // Deallocate the pipe object |
387 | delete this; |
388 | } |
389 | |
390 | void zmq::pipe_t::process_pipe_hwm (int inhwm_, int outhwm_) |
391 | { |
392 | set_hwms (inhwm_, outhwm_); |
393 | } |
394 | |
395 | void zmq::pipe_t::set_nodelay () |
396 | { |
397 | this->_delay = false; |
398 | } |
399 | |
400 | void zmq::pipe_t::terminate (bool delay_) |
401 | { |
402 | // Overload the value specified at pipe creation. |
403 | _delay = delay_; |
404 | |
405 | // If terminate was already called, we can ignore the duplicate invocation. |
406 | if (_state == term_req_sent1 || _state == term_req_sent2) { |
407 | return; |
408 | } |
409 | // If the pipe is in the final phase of async termination, it's going to |
410 | // closed anyway. No need to do anything special here. |
411 | if (_state == term_ack_sent) { |
412 | return; |
413 | } |
414 | // The simple sync termination case. Ask the peer to terminate and wait |
415 | // for the ack. |
416 | if (_state == active) { |
417 | send_pipe_term (_peer); |
418 | _state = term_req_sent1; |
419 | } |
420 | // There are still pending messages available, but the user calls |
421 | // 'terminate'. We can act as if all the pending messages were read. |
422 | else if (_state == waiting_for_delimiter && !_delay) { |
423 | // Drop any unfinished outbound messages. |
424 | rollback (); |
425 | _out_pipe = NULL; |
426 | send_pipe_term_ack (_peer); |
427 | _state = term_ack_sent; |
428 | } |
429 | // If there are pending messages still available, do nothing. |
430 | else if (_state == waiting_for_delimiter) { |
431 | } |
432 | // We've already got delimiter, but not term command yet. We can ignore |
433 | // the delimiter and ack synchronously terminate as if we were in |
434 | // active state. |
435 | else if (_state == delimiter_received) { |
436 | send_pipe_term (_peer); |
437 | _state = term_req_sent1; |
438 | } |
439 | // There are no other states. |
440 | else { |
441 | zmq_assert (false); |
442 | } |
443 | |
444 | // Stop outbound flow of messages. |
445 | _out_active = false; |
446 | |
447 | if (_out_pipe) { |
448 | // Drop any unfinished outbound messages. |
449 | rollback (); |
450 | |
451 | // Write the delimiter into the pipe. Note that watermarks are not |
452 | // checked; thus the delimiter can be written even when the pipe is full. |
453 | msg_t msg; |
454 | msg.init_delimiter (); |
455 | _out_pipe->write (msg, false); |
456 | flush (); |
457 | } |
458 | } |
459 | |
460 | bool zmq::pipe_t::is_delimiter (const msg_t &msg_) |
461 | { |
462 | return msg_.is_delimiter (); |
463 | } |
464 | |
465 | int zmq::pipe_t::compute_lwm (int hwm_) |
466 | { |
467 | // Compute the low water mark. Following point should be taken |
468 | // into consideration: |
469 | // |
470 | // 1. LWM has to be less than HWM. |
471 | // 2. LWM cannot be set to very low value (such as zero) as after filling |
472 | // the queue it would start to refill only after all the messages are |
473 | // read from it and thus unnecessarily hold the progress back. |
474 | // 3. LWM cannot be set to very high value (such as HWM-1) as it would |
475 | // result in lock-step filling of the queue - if a single message is |
476 | // read from a full queue, writer thread is resumed to write exactly one |
477 | // message to the queue and go back to sleep immediately. This would |
478 | // result in low performance. |
479 | // |
480 | // Given the 3. it would be good to keep HWM and LWM as far apart as |
481 | // possible to reduce the thread switching overhead to almost zero. |
482 | // Let's make LWM 1/2 of HWM. |
483 | const int result = (hwm_ + 1) / 2; |
484 | |
485 | return result; |
486 | } |
487 | |
488 | void zmq::pipe_t::process_delimiter () |
489 | { |
490 | zmq_assert (_state == active || _state == waiting_for_delimiter); |
491 | |
492 | if (_state == active) |
493 | _state = delimiter_received; |
494 | else { |
495 | _out_pipe = NULL; |
496 | send_pipe_term_ack (_peer); |
497 | _state = term_ack_sent; |
498 | } |
499 | } |
500 | |
501 | void zmq::pipe_t::hiccup () |
502 | { |
503 | // If termination is already under way do nothing. |
504 | if (_state != active) |
505 | return; |
506 | |
507 | // We'll drop the pointer to the inpipe. From now on, the peer is |
508 | // responsible for deallocating it. |
509 | |
510 | // Create new inpipe. |
511 | _in_pipe = |
512 | _conflate |
513 | ? static_cast<upipe_t *> (new (std::nothrow) ypipe_conflate_t<msg_t> ()) |
514 | : new (std::nothrow) ypipe_t<msg_t, message_pipe_granularity> (); |
515 | |
516 | alloc_assert (_in_pipe); |
517 | _in_active = true; |
518 | |
519 | // Notify the peer about the hiccup. |
520 | send_hiccup (_peer, _in_pipe); |
521 | } |
522 | |
523 | void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_) |
524 | { |
525 | int in = inhwm_ + std::max (_in_hwm_boost, 0); |
526 | int out = outhwm_ + std::max (_out_hwm_boost, 0); |
527 | |
528 | // if either send or recv side has hwm <= 0 it means infinite so we should set hwms infinite |
529 | if (inhwm_ <= 0 || _in_hwm_boost == 0) |
530 | in = 0; |
531 | |
532 | if (outhwm_ <= 0 || _out_hwm_boost == 0) |
533 | out = 0; |
534 | |
535 | _lwm = compute_lwm (in); |
536 | _hwm = out; |
537 | } |
538 | |
539 | void zmq::pipe_t::set_hwms_boost (int inhwmboost_, int outhwmboost_) |
540 | { |
541 | _in_hwm_boost = inhwmboost_; |
542 | _out_hwm_boost = outhwmboost_; |
543 | } |
544 | |
545 | bool zmq::pipe_t::check_hwm () const |
546 | { |
547 | const bool full = |
548 | _hwm > 0 && _msgs_written - _peers_msgs_read >= uint64_t (_hwm); |
549 | return !full; |
550 | } |
551 | |
552 | void zmq::pipe_t::send_hwms_to_peer (int inhwm_, int outhwm_) |
553 | { |
554 | send_pipe_hwm (_peer, inhwm_, outhwm_); |
555 | } |
556 | |
557 | void zmq::pipe_t::set_endpoint_pair (zmq::endpoint_uri_pair_t endpoint_pair_) |
558 | { |
559 | _endpoint_pair = ZMQ_MOVE (endpoint_pair_); |
560 | } |
561 | |
562 | const zmq::endpoint_uri_pair_t &zmq::pipe_t::get_endpoint_pair () const |
563 | { |
564 | return _endpoint_pair; |
565 | } |
566 | |
567 | void zmq::pipe_t::send_stats_to_peer (own_t *socket_base_) |
568 | { |
569 | endpoint_uri_pair_t *ep = |
570 | new (std::nothrow) endpoint_uri_pair_t (_endpoint_pair); |
571 | send_pipe_peer_stats (_peer, _msgs_written - _peers_msgs_read, socket_base_, |
572 | ep); |
573 | } |
574 | |
575 | void zmq::pipe_t::process_pipe_peer_stats (uint64_t queue_count_, |
576 | own_t *socket_base_, |
577 | endpoint_uri_pair_t *endpoint_pair_) |
578 | { |
579 | send_pipe_stats_publish (socket_base_, queue_count_, |
580 | _msgs_written - _peers_msgs_read, endpoint_pair_); |
581 | } |
582 | |