1 | /* |
2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #include "precompiled.hpp" |
31 | #include "own.hpp" |
32 | #include "err.hpp" |
33 | #include "io_thread.hpp" |
34 | |
35 | zmq::own_t::own_t (class ctx_t *parent_, uint32_t tid_) : |
36 | object_t (parent_, tid_), |
37 | _terminating (false), |
38 | _sent_seqnum (0), |
39 | _processed_seqnum (0), |
40 | _owner (NULL), |
41 | _term_acks (0) |
42 | { |
43 | } |
44 | |
45 | zmq::own_t::own_t (io_thread_t *io_thread_, const options_t &options_) : |
46 | object_t (io_thread_), |
47 | options (options_), |
48 | _terminating (false), |
49 | _sent_seqnum (0), |
50 | _processed_seqnum (0), |
51 | _owner (NULL), |
52 | _term_acks (0) |
53 | { |
54 | } |
55 | |
56 | zmq::own_t::~own_t () |
57 | { |
58 | } |
59 | |
60 | void zmq::own_t::set_owner (own_t *owner_) |
61 | { |
62 | zmq_assert (!_owner); |
63 | _owner = owner_; |
64 | } |
65 | |
66 | void zmq::own_t::inc_seqnum () |
67 | { |
68 | // This function may be called from a different thread! |
69 | _sent_seqnum.add (1); |
70 | } |
71 | |
72 | void zmq::own_t::process_seqnum () |
73 | { |
74 | // Catch up with counter of processed commands. |
75 | _processed_seqnum++; |
76 | |
77 | // We may have catched up and still have pending terms acks. |
78 | check_term_acks (); |
79 | } |
80 | |
81 | void zmq::own_t::launch_child (own_t *object_) |
82 | { |
83 | // Specify the owner of the object. |
84 | object_->set_owner (this); |
85 | |
86 | // Plug the object into the I/O thread. |
87 | send_plug (object_); |
88 | |
89 | // Take ownership of the object. |
90 | send_own (this, object_); |
91 | } |
92 | |
93 | void zmq::own_t::term_child (own_t *object_) |
94 | { |
95 | process_term_req (object_); |
96 | } |
97 | |
98 | void zmq::own_t::process_term_req (own_t *object_) |
99 | { |
100 | // When shutting down we can ignore termination requests from owned |
101 | // objects. The termination request was already sent to the object. |
102 | if (_terminating) |
103 | return; |
104 | |
105 | // If not found, we assume that termination request was already sent to |
106 | // the object so we can safely ignore the request. |
107 | if (0 == _owned.erase (object_)) |
108 | return; |
109 | |
110 | // If I/O object is well and alive let's ask it to terminate. |
111 | register_term_acks (1); |
112 | |
113 | // Note that this object is the root of the (partial shutdown) thus, its |
114 | // value of linger is used, rather than the value stored by the children. |
115 | send_term (object_, options.linger.load ()); |
116 | } |
117 | |
118 | void zmq::own_t::process_own (own_t *object_) |
119 | { |
120 | // If the object is already being shut down, new owned objects are |
121 | // immediately asked to terminate. Note that linger is set to zero. |
122 | if (_terminating) { |
123 | register_term_acks (1); |
124 | send_term (object_, 0); |
125 | return; |
126 | } |
127 | |
128 | // Store the reference to the owned object. |
129 | _owned.insert (object_); |
130 | } |
131 | |
132 | void zmq::own_t::terminate () |
133 | { |
134 | // If termination is already underway, there's no point |
135 | // in starting it anew. |
136 | if (_terminating) |
137 | return; |
138 | |
139 | // As for the root of the ownership tree, there's no one to terminate it, |
140 | // so it has to terminate itself. |
141 | if (!_owner) { |
142 | process_term (options.linger.load ()); |
143 | return; |
144 | } |
145 | |
146 | // If I am an owned object, I'll ask my owner to terminate me. |
147 | send_term_req (_owner, this); |
148 | } |
149 | |
150 | bool zmq::own_t::is_terminating () |
151 | { |
152 | return _terminating; |
153 | } |
154 | |
155 | void zmq::own_t::process_term (int linger_) |
156 | { |
157 | // Double termination should never happen. |
158 | zmq_assert (!_terminating); |
159 | |
160 | // Send termination request to all owned objects. |
161 | for (owned_t::iterator it = _owned.begin (), end = _owned.end (); it != end; |
162 | ++it) |
163 | send_term (*it, linger_); |
164 | register_term_acks (static_cast<int> (_owned.size ())); |
165 | _owned.clear (); |
166 | |
167 | // Start termination process and check whether by chance we cannot |
168 | // terminate immediately. |
169 | _terminating = true; |
170 | check_term_acks (); |
171 | } |
172 | |
173 | void zmq::own_t::register_term_acks (int count_) |
174 | { |
175 | _term_acks += count_; |
176 | } |
177 | |
178 | void zmq::own_t::unregister_term_ack () |
179 | { |
180 | zmq_assert (_term_acks > 0); |
181 | _term_acks--; |
182 | |
183 | // This may be a last ack we are waiting for before termination... |
184 | check_term_acks (); |
185 | } |
186 | |
187 | void zmq::own_t::process_term_ack () |
188 | { |
189 | unregister_term_ack (); |
190 | } |
191 | |
192 | void zmq::own_t::check_term_acks () |
193 | { |
194 | if (_terminating && _processed_seqnum == _sent_seqnum.get () |
195 | && _term_acks == 0) { |
196 | // Sanity check. There should be no active children at this point. |
197 | zmq_assert (_owned.empty ()); |
198 | |
199 | // The root object has nobody to confirm the termination to. |
200 | // Other nodes will confirm the termination to the owner. |
201 | if (_owner) |
202 | send_term_ack (_owner); |
203 | |
204 | // Deallocate the resources. |
205 | process_destroy (); |
206 | } |
207 | } |
208 | |
209 | void zmq::own_t::process_destroy () |
210 | { |
211 | delete this; |
212 | } |
213 | |