1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#ifndef __ZMQ_OBJECT_HPP_INCLUDED__
31#define __ZMQ_OBJECT_HPP_INCLUDED__
32
33#include <string>
34
35#include "endpoint.hpp"
36#include "macros.hpp"
37#include "stdint.hpp"
38
39namespace zmq
40{
41struct i_engine;
42struct endpoint_t;
43struct pending_connection_t;
44struct command_t;
45class ctx_t;
46class pipe_t;
47class socket_base_t;
48class session_base_t;
49class io_thread_t;
50class own_t;
51
52// Base class for all objects that participate in inter-thread
53// communication.
54
55class object_t
56{
57 public:
58 object_t (zmq::ctx_t *ctx_, uint32_t tid_);
59 object_t (object_t *parent_);
60 virtual ~object_t ();
61
62 uint32_t get_tid ();
63 void set_tid (uint32_t id_);
64 ctx_t *get_ctx ();
65 void process_command (zmq::command_t &cmd_);
66 void send_inproc_connected (zmq::socket_base_t *socket_);
67 void send_bind (zmq::own_t *destination_,
68 zmq::pipe_t *pipe_,
69 bool inc_seqnum_ = true);
70
71 protected:
72 // Using following function, socket is able to access global
73 // repository of inproc endpoints.
74 int register_endpoint (const char *addr_, const zmq::endpoint_t &endpoint_);
75 int unregister_endpoint (const std::string &addr_, socket_base_t *socket_);
76 void unregister_endpoints (zmq::socket_base_t *socket_);
77 zmq::endpoint_t find_endpoint (const char *addr_);
78 void pend_connection (const std::string &addr_,
79 const endpoint_t &endpoint_,
80 pipe_t **pipes_);
81 void connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_);
82
83 void destroy_socket (zmq::socket_base_t *socket_);
84
85 // Logs an message.
86 void log (const char *format_, ...);
87
88 // Chooses least loaded I/O thread.
89 zmq::io_thread_t *choose_io_thread (uint64_t affinity_);
90
91 // Derived object can use these functions to send commands
92 // to other objects.
93 void send_stop ();
94 void send_plug (zmq::own_t *destination_, bool inc_seqnum_ = true);
95 void send_own (zmq::own_t *destination_, zmq::own_t *object_);
96 void send_attach (zmq::session_base_t *destination_,
97 zmq::i_engine *engine_,
98 bool inc_seqnum_ = true);
99 void send_activate_read (zmq::pipe_t *destination_);
100 void send_activate_write (zmq::pipe_t *destination_, uint64_t msgs_read_);
101 void send_hiccup (zmq::pipe_t *destination_, void *pipe_);
102 void send_pipe_peer_stats (zmq::pipe_t *destination_,
103 uint64_t queue_count_,
104 zmq::own_t *socket_base,
105 endpoint_uri_pair_t *endpoint_pair_);
106 void send_pipe_stats_publish (zmq::own_t *destination_,
107 uint64_t outbound_queue_count_,
108 uint64_t inbound_queue_count_,
109 endpoint_uri_pair_t *endpoint_pair_);
110 void send_pipe_term (zmq::pipe_t *destination_);
111 void send_pipe_term_ack (zmq::pipe_t *destination_);
112 void send_pipe_hwm (zmq::pipe_t *destination_, int inhwm_, int outhwm_);
113 void send_term_req (zmq::own_t *destination_, zmq::own_t *object_);
114 void send_term (zmq::own_t *destination_, int linger_);
115 void send_term_ack (zmq::own_t *destination_);
116 void send_term_endpoint (own_t *destination_, std::string *endpoint_);
117 void send_reap (zmq::socket_base_t *socket_);
118 void send_reaped ();
119 void send_done ();
120
121 // These handlers can be overridden by the derived objects. They are
122 // called when command arrives from another thread.
123 virtual void process_stop ();
124 virtual void process_plug ();
125 virtual void process_own (zmq::own_t *object_);
126 virtual void process_attach (zmq::i_engine *engine_);
127 virtual void process_bind (zmq::pipe_t *pipe_);
128 virtual void process_activate_read ();
129 virtual void process_activate_write (uint64_t msgs_read_);
130 virtual void process_hiccup (void *pipe_);
131 virtual void process_pipe_peer_stats (uint64_t queue_count_,
132 zmq::own_t *socket_base_,
133 endpoint_uri_pair_t *endpoint_pair_);
134 virtual void
135 process_pipe_stats_publish (uint64_t outbound_queue_count_,
136 uint64_t inbound_queue_count_,
137 endpoint_uri_pair_t *endpoint_pair_);
138 virtual void process_pipe_term ();
139 virtual void process_pipe_term_ack ();
140 virtual void process_pipe_hwm (int inhwm_, int outhwm_);
141 virtual void process_term_req (zmq::own_t *object_);
142 virtual void process_term (int linger_);
143 virtual void process_term_ack ();
144 virtual void process_term_endpoint (std::string *endpoint_);
145 virtual void process_reap (zmq::socket_base_t *socket_);
146 virtual void process_reaped ();
147
148 // Special handler called after a command that requires a seqnum
149 // was processed. The implementation should catch up with its counter
150 // of processed commands here.
151 virtual void process_seqnum ();
152
153 private:
154 // Context provides access to the global state.
155 zmq::ctx_t *const _ctx;
156
157 // Thread ID of the thread the object belongs to.
158 uint32_t _tid;
159
160 void send_command (command_t &cmd_);
161
162 ZMQ_NON_COPYABLE_NOR_MOVABLE (object_t)
163};
164}
165
166#endif
167