1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include <string.h>
32#include <stdarg.h>
33
34#include "object.hpp"
35#include "ctx.hpp"
36#include "err.hpp"
37#include "pipe.hpp"
38#include "io_thread.hpp"
39#include "session_base.hpp"
40#include "socket_base.hpp"
41
42zmq::object_t::object_t (ctx_t *ctx_, uint32_t tid_) : _ctx (ctx_), _tid (tid_)
43{
44}
45
46zmq::object_t::object_t (object_t *parent_) :
47 _ctx (parent_->_ctx),
48 _tid (parent_->_tid)
49{
50}
51
52zmq::object_t::~object_t ()
53{
54}
55
56uint32_t zmq::object_t::get_tid ()
57{
58 return _tid;
59}
60
61void zmq::object_t::set_tid (uint32_t id_)
62{
63 _tid = id_;
64}
65
66zmq::ctx_t *zmq::object_t::get_ctx ()
67{
68 return _ctx;
69}
70
71void zmq::object_t::process_command (command_t &cmd_)
72{
73 switch (cmd_.type) {
74 case command_t::activate_read:
75 process_activate_read ();
76 break;
77
78 case command_t::activate_write:
79 process_activate_write (cmd_.args.activate_write.msgs_read);
80 break;
81
82 case command_t::stop:
83 process_stop ();
84 break;
85
86 case command_t::plug:
87 process_plug ();
88 process_seqnum ();
89 break;
90
91 case command_t::own:
92 process_own (cmd_.args.own.object);
93 process_seqnum ();
94 break;
95
96 case command_t::attach:
97 process_attach (cmd_.args.attach.engine);
98 process_seqnum ();
99 break;
100
101 case command_t::bind:
102 process_bind (cmd_.args.bind.pipe);
103 process_seqnum ();
104 break;
105
106 case command_t::hiccup:
107 process_hiccup (cmd_.args.hiccup.pipe);
108 break;
109
110 case command_t::pipe_peer_stats:
111 process_pipe_peer_stats (cmd_.args.pipe_peer_stats.queue_count,
112 cmd_.args.pipe_peer_stats.socket_base,
113 cmd_.args.pipe_peer_stats.endpoint_pair);
114 break;
115
116 case command_t::pipe_stats_publish:
117 process_pipe_stats_publish (
118 cmd_.args.pipe_stats_publish.outbound_queue_count,
119 cmd_.args.pipe_stats_publish.inbound_queue_count,
120 cmd_.args.pipe_stats_publish.endpoint_pair);
121 break;
122
123 case command_t::pipe_term:
124 process_pipe_term ();
125 break;
126
127 case command_t::pipe_term_ack:
128 process_pipe_term_ack ();
129 break;
130
131 case command_t::pipe_hwm:
132 process_pipe_hwm (cmd_.args.pipe_hwm.inhwm,
133 cmd_.args.pipe_hwm.outhwm);
134 break;
135
136 case command_t::term_req:
137 process_term_req (cmd_.args.term_req.object);
138 break;
139
140 case command_t::term:
141 process_term (cmd_.args.term.linger);
142 break;
143
144 case command_t::term_ack:
145 process_term_ack ();
146 break;
147
148 case command_t::term_endpoint:
149 process_term_endpoint (cmd_.args.term_endpoint.endpoint);
150 break;
151
152 case command_t::reap:
153 process_reap (cmd_.args.reap.socket);
154 break;
155
156 case command_t::reaped:
157 process_reaped ();
158 break;
159
160 case command_t::inproc_connected:
161 process_seqnum ();
162 break;
163
164 case command_t::done:
165 default:
166 zmq_assert (false);
167 }
168}
169
170int zmq::object_t::register_endpoint (const char *addr_,
171 const endpoint_t &endpoint_)
172{
173 return _ctx->register_endpoint (addr_, endpoint_);
174}
175
176int zmq::object_t::unregister_endpoint (const std::string &addr_,
177 socket_base_t *socket_)
178{
179 return _ctx->unregister_endpoint (addr_, socket_);
180}
181
182void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
183{
184 return _ctx->unregister_endpoints (socket_);
185}
186
187zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_)
188{
189 return _ctx->find_endpoint (addr_);
190}
191
192void zmq::object_t::pend_connection (const std::string &addr_,
193 const endpoint_t &endpoint_,
194 pipe_t **pipes_)
195{
196 _ctx->pend_connection (addr_, endpoint_, pipes_);
197}
198
199void zmq::object_t::connect_pending (const char *addr_,
200 zmq::socket_base_t *bind_socket_)
201{
202 return _ctx->connect_pending (addr_, bind_socket_);
203}
204
205void zmq::object_t::destroy_socket (socket_base_t *socket_)
206{
207 _ctx->destroy_socket (socket_);
208}
209
210zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_)
211{
212 return _ctx->choose_io_thread (affinity_);
213}
214
215void zmq::object_t::send_stop ()
216{
217 // 'stop' command goes always from administrative thread to
218 // the current object.
219 command_t cmd;
220 cmd.destination = this;
221 cmd.type = command_t::stop;
222 _ctx->send_command (_tid, cmd);
223}
224
225void zmq::object_t::send_plug (own_t *destination_, bool inc_seqnum_)
226{
227 if (inc_seqnum_)
228 destination_->inc_seqnum ();
229
230 command_t cmd;
231 cmd.destination = destination_;
232 cmd.type = command_t::plug;
233 send_command (cmd);
234}
235
236void zmq::object_t::send_own (own_t *destination_, own_t *object_)
237{
238 destination_->inc_seqnum ();
239 command_t cmd;
240 cmd.destination = destination_;
241 cmd.type = command_t::own;
242 cmd.args.own.object = object_;
243 send_command (cmd);
244}
245
246void zmq::object_t::send_attach (session_base_t *destination_,
247 i_engine *engine_,
248 bool inc_seqnum_)
249{
250 if (inc_seqnum_)
251 destination_->inc_seqnum ();
252
253 command_t cmd;
254 cmd.destination = destination_;
255 cmd.type = command_t::attach;
256 cmd.args.attach.engine = engine_;
257 send_command (cmd);
258}
259
260void zmq::object_t::send_bind (own_t *destination_,
261 pipe_t *pipe_,
262 bool inc_seqnum_)
263{
264 if (inc_seqnum_)
265 destination_->inc_seqnum ();
266
267 command_t cmd;
268 cmd.destination = destination_;
269 cmd.type = command_t::bind;
270 cmd.args.bind.pipe = pipe_;
271 send_command (cmd);
272}
273
274void zmq::object_t::send_activate_read (pipe_t *destination_)
275{
276 command_t cmd;
277 cmd.destination = destination_;
278 cmd.type = command_t::activate_read;
279 send_command (cmd);
280}
281
282void zmq::object_t::send_activate_write (pipe_t *destination_,
283 uint64_t msgs_read_)
284{
285 command_t cmd;
286 cmd.destination = destination_;
287 cmd.type = command_t::activate_write;
288 cmd.args.activate_write.msgs_read = msgs_read_;
289 send_command (cmd);
290}
291
292void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_)
293{
294 command_t cmd;
295 cmd.destination = destination_;
296 cmd.type = command_t::hiccup;
297 cmd.args.hiccup.pipe = pipe_;
298 send_command (cmd);
299}
300
301void zmq::object_t::send_pipe_peer_stats (pipe_t *destination_,
302 uint64_t queue_count_,
303 own_t *socket_base_,
304 endpoint_uri_pair_t *endpoint_pair_)
305{
306 command_t cmd;
307 cmd.destination = destination_;
308 cmd.type = command_t::pipe_peer_stats;
309 cmd.args.pipe_peer_stats.queue_count = queue_count_;
310 cmd.args.pipe_peer_stats.socket_base = socket_base_;
311 cmd.args.pipe_peer_stats.endpoint_pair = endpoint_pair_;
312 send_command (cmd);
313}
314
315void zmq::object_t::send_pipe_stats_publish (
316 own_t *destination_,
317 uint64_t outbound_queue_count_,
318 uint64_t inbound_queue_count_,
319 endpoint_uri_pair_t *endpoint_pair_)
320{
321 command_t cmd;
322 cmd.destination = destination_;
323 cmd.type = command_t::pipe_stats_publish;
324 cmd.args.pipe_stats_publish.outbound_queue_count = outbound_queue_count_;
325 cmd.args.pipe_stats_publish.inbound_queue_count = inbound_queue_count_;
326 cmd.args.pipe_stats_publish.endpoint_pair = endpoint_pair_;
327 send_command (cmd);
328}
329
330void zmq::object_t::send_pipe_term (pipe_t *destination_)
331{
332 command_t cmd;
333 cmd.destination = destination_;
334 cmd.type = command_t::pipe_term;
335 send_command (cmd);
336}
337
338void zmq::object_t::send_pipe_term_ack (pipe_t *destination_)
339{
340 command_t cmd;
341 cmd.destination = destination_;
342 cmd.type = command_t::pipe_term_ack;
343 send_command (cmd);
344}
345
346void zmq::object_t::send_pipe_hwm (pipe_t *destination_,
347 int inhwm_,
348 int outhwm_)
349{
350 command_t cmd;
351 cmd.destination = destination_;
352 cmd.type = command_t::pipe_hwm;
353 cmd.args.pipe_hwm.inhwm = inhwm_;
354 cmd.args.pipe_hwm.outhwm = outhwm_;
355 send_command (cmd);
356}
357
358void zmq::object_t::send_term_req (own_t *destination_, own_t *object_)
359{
360 command_t cmd;
361 cmd.destination = destination_;
362 cmd.type = command_t::term_req;
363 cmd.args.term_req.object = object_;
364 send_command (cmd);
365}
366
367void zmq::object_t::send_term (own_t *destination_, int linger_)
368{
369 command_t cmd;
370 cmd.destination = destination_;
371 cmd.type = command_t::term;
372 cmd.args.term.linger = linger_;
373 send_command (cmd);
374}
375
376void zmq::object_t::send_term_ack (own_t *destination_)
377{
378 command_t cmd;
379 cmd.destination = destination_;
380 cmd.type = command_t::term_ack;
381 send_command (cmd);
382}
383
384void zmq::object_t::send_term_endpoint (own_t *destination_,
385 std::string *endpoint_)
386{
387 command_t cmd;
388 cmd.destination = destination_;
389 cmd.type = command_t::term_endpoint;
390 cmd.args.term_endpoint.endpoint = endpoint_;
391 send_command (cmd);
392}
393
394void zmq::object_t::send_reap (class socket_base_t *socket_)
395{
396 command_t cmd;
397 cmd.destination = _ctx->get_reaper ();
398 cmd.type = command_t::reap;
399 cmd.args.reap.socket = socket_;
400 send_command (cmd);
401}
402
403void zmq::object_t::send_reaped ()
404{
405 command_t cmd;
406 cmd.destination = _ctx->get_reaper ();
407 cmd.type = command_t::reaped;
408 send_command (cmd);
409}
410
411void zmq::object_t::send_inproc_connected (zmq::socket_base_t *socket_)
412{
413 command_t cmd;
414 cmd.destination = socket_;
415 cmd.type = command_t::inproc_connected;
416 send_command (cmd);
417}
418
419void zmq::object_t::send_done ()
420{
421 command_t cmd;
422 cmd.destination = NULL;
423 cmd.type = command_t::done;
424 _ctx->send_command (ctx_t::term_tid, cmd);
425}
426
427void zmq::object_t::process_stop ()
428{
429 zmq_assert (false);
430}
431
432void zmq::object_t::process_plug ()
433{
434 zmq_assert (false);
435}
436
437void zmq::object_t::process_own (own_t *)
438{
439 zmq_assert (false);
440}
441
442void zmq::object_t::process_attach (i_engine *)
443{
444 zmq_assert (false);
445}
446
447void zmq::object_t::process_bind (pipe_t *)
448{
449 zmq_assert (false);
450}
451
452void zmq::object_t::process_activate_read ()
453{
454 zmq_assert (false);
455}
456
457void zmq::object_t::process_activate_write (uint64_t)
458{
459 zmq_assert (false);
460}
461
462void zmq::object_t::process_hiccup (void *)
463{
464 zmq_assert (false);
465}
466
467void zmq::object_t::process_pipe_peer_stats (uint64_t,
468 own_t *,
469 endpoint_uri_pair_t *)
470{
471 zmq_assert (false);
472}
473
474void zmq::object_t::process_pipe_stats_publish (uint64_t,
475 uint64_t,
476 endpoint_uri_pair_t *)
477{
478 zmq_assert (false);
479}
480
481void zmq::object_t::process_pipe_term ()
482{
483 zmq_assert (false);
484}
485
486void zmq::object_t::process_pipe_term_ack ()
487{
488 zmq_assert (false);
489}
490
491void zmq::object_t::process_pipe_hwm (int, int)
492{
493 zmq_assert (false);
494}
495
496void zmq::object_t::process_term_req (own_t *)
497{
498 zmq_assert (false);
499}
500
501void zmq::object_t::process_term (int)
502{
503 zmq_assert (false);
504}
505
506void zmq::object_t::process_term_ack ()
507{
508 zmq_assert (false);
509}
510
511void zmq::object_t::process_term_endpoint (std::string *)
512{
513 zmq_assert (false);
514}
515
516void zmq::object_t::process_reap (class socket_base_t *)
517{
518 zmq_assert (false);
519}
520
521void zmq::object_t::process_reaped ()
522{
523 zmq_assert (false);
524}
525
526void zmq::object_t::process_seqnum ()
527{
528 zmq_assert (false);
529}
530
531void zmq::object_t::send_command (command_t &cmd_)
532{
533 _ctx->send_command (cmd_.destination->get_tid (), cmd_);
534}
535