1 | /* |
2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #ifndef __ZMQ_OBJECT_HPP_INCLUDED__ |
31 | #define __ZMQ_OBJECT_HPP_INCLUDED__ |
32 | |
33 | #include <string> |
34 | |
35 | #include "endpoint.hpp" |
36 | #include "macros.hpp" |
37 | #include "stdint.hpp" |
38 | |
39 | namespace zmq |
40 | { |
41 | struct i_engine; |
42 | struct endpoint_t; |
43 | struct pending_connection_t; |
44 | struct command_t; |
45 | class ctx_t; |
46 | class pipe_t; |
47 | class socket_base_t; |
48 | class session_base_t; |
49 | class io_thread_t; |
50 | class own_t; |
51 | |
52 | // Base class for all objects that participate in inter-thread |
53 | // communication. |
54 | |
55 | class object_t |
56 | { |
57 | public: |
58 | object_t (zmq::ctx_t *ctx_, uint32_t tid_); |
59 | object_t (object_t *parent_); |
60 | virtual ~object_t (); |
61 | |
62 | uint32_t get_tid (); |
63 | void set_tid (uint32_t id_); |
64 | ctx_t *get_ctx (); |
65 | void process_command (zmq::command_t &cmd_); |
66 | void send_inproc_connected (zmq::socket_base_t *socket_); |
67 | void send_bind (zmq::own_t *destination_, |
68 | zmq::pipe_t *pipe_, |
69 | bool inc_seqnum_ = true); |
70 | |
71 | protected: |
72 | // Using following function, socket is able to access global |
73 | // repository of inproc endpoints. |
74 | int register_endpoint (const char *addr_, const zmq::endpoint_t &endpoint_); |
75 | int unregister_endpoint (const std::string &addr_, socket_base_t *socket_); |
76 | void unregister_endpoints (zmq::socket_base_t *socket_); |
77 | zmq::endpoint_t find_endpoint (const char *addr_); |
78 | void pend_connection (const std::string &addr_, |
79 | const endpoint_t &endpoint_, |
80 | pipe_t **pipes_); |
81 | void connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_); |
82 | |
83 | void destroy_socket (zmq::socket_base_t *socket_); |
84 | |
85 | // Logs an message. |
86 | void log (const char *format_, ...); |
87 | |
88 | // Chooses least loaded I/O thread. |
89 | zmq::io_thread_t *choose_io_thread (uint64_t affinity_); |
90 | |
91 | // Derived object can use these functions to send commands |
92 | // to other objects. |
93 | void send_stop (); |
94 | void send_plug (zmq::own_t *destination_, bool inc_seqnum_ = true); |
95 | void send_own (zmq::own_t *destination_, zmq::own_t *object_); |
96 | void send_attach (zmq::session_base_t *destination_, |
97 | zmq::i_engine *engine_, |
98 | bool inc_seqnum_ = true); |
99 | void send_activate_read (zmq::pipe_t *destination_); |
100 | void send_activate_write (zmq::pipe_t *destination_, uint64_t msgs_read_); |
101 | void send_hiccup (zmq::pipe_t *destination_, void *pipe_); |
102 | void send_pipe_peer_stats (zmq::pipe_t *destination_, |
103 | uint64_t queue_count_, |
104 | zmq::own_t *socket_base, |
105 | endpoint_uri_pair_t *endpoint_pair_); |
106 | void send_pipe_stats_publish (zmq::own_t *destination_, |
107 | uint64_t outbound_queue_count_, |
108 | uint64_t inbound_queue_count_, |
109 | endpoint_uri_pair_t *endpoint_pair_); |
110 | void send_pipe_term (zmq::pipe_t *destination_); |
111 | void send_pipe_term_ack (zmq::pipe_t *destination_); |
112 | void send_pipe_hwm (zmq::pipe_t *destination_, int inhwm_, int outhwm_); |
113 | void send_term_req (zmq::own_t *destination_, zmq::own_t *object_); |
114 | void send_term (zmq::own_t *destination_, int linger_); |
115 | void send_term_ack (zmq::own_t *destination_); |
116 | void send_term_endpoint (own_t *destination_, std::string *endpoint_); |
117 | void send_reap (zmq::socket_base_t *socket_); |
118 | void send_reaped (); |
119 | void send_done (); |
120 | |
121 | // These handlers can be overridden by the derived objects. They are |
122 | // called when command arrives from another thread. |
123 | virtual void process_stop (); |
124 | virtual void process_plug (); |
125 | virtual void process_own (zmq::own_t *object_); |
126 | virtual void process_attach (zmq::i_engine *engine_); |
127 | virtual void process_bind (zmq::pipe_t *pipe_); |
128 | virtual void process_activate_read (); |
129 | virtual void process_activate_write (uint64_t msgs_read_); |
130 | virtual void process_hiccup (void *pipe_); |
131 | virtual void process_pipe_peer_stats (uint64_t queue_count_, |
132 | zmq::own_t *socket_base_, |
133 | endpoint_uri_pair_t *endpoint_pair_); |
134 | virtual void |
135 | process_pipe_stats_publish (uint64_t outbound_queue_count_, |
136 | uint64_t inbound_queue_count_, |
137 | endpoint_uri_pair_t *endpoint_pair_); |
138 | virtual void process_pipe_term (); |
139 | virtual void process_pipe_term_ack (); |
140 | virtual void process_pipe_hwm (int inhwm_, int outhwm_); |
141 | virtual void process_term_req (zmq::own_t *object_); |
142 | virtual void process_term (int linger_); |
143 | virtual void process_term_ack (); |
144 | virtual void process_term_endpoint (std::string *endpoint_); |
145 | virtual void process_reap (zmq::socket_base_t *socket_); |
146 | virtual void process_reaped (); |
147 | |
148 | // Special handler called after a command that requires a seqnum |
149 | // was processed. The implementation should catch up with its counter |
150 | // of processed commands here. |
151 | virtual void process_seqnum (); |
152 | |
153 | private: |
154 | // Context provides access to the global state. |
155 | zmq::ctx_t *const _ctx; |
156 | |
157 | // Thread ID of the thread the object belongs to. |
158 | uint32_t _tid; |
159 | |
160 | void send_command (command_t &cmd_); |
161 | |
162 | ZMQ_NON_COPYABLE_NOR_MOVABLE (object_t) |
163 | }; |
164 | } |
165 | |
166 | #endif |
167 | |