1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#ifndef __ZMQ_SESSION_BASE_HPP_INCLUDED__
31#define __ZMQ_SESSION_BASE_HPP_INCLUDED__
32
33#include <stdarg.h>
34
35#include "own.hpp"
36#include "io_object.hpp"
37#include "pipe.hpp"
38#include "socket_base.hpp"
39#include "i_engine.hpp"
40#include "msg.hpp"
41
42namespace zmq
43{
44class io_thread_t;
45struct i_engine;
46struct address_t;
47
48class session_base_t : public own_t, public io_object_t, public i_pipe_events
49{
50 public:
51 // Create a session of the particular type.
52 static session_base_t *create (zmq::io_thread_t *io_thread_,
53 bool active_,
54 zmq::socket_base_t *socket_,
55 const options_t &options_,
56 address_t *addr_);
57
58 // To be used once only, when creating the session.
59 void attach_pipe (zmq::pipe_t *pipe_);
60
61 // Following functions are the interface exposed towards the engine.
62 virtual void reset ();
63 void flush ();
64 void rollback ();
65 void engine_error (zmq::i_engine::error_reason_t reason_);
66
67 // i_pipe_events interface implementation.
68 void read_activated (zmq::pipe_t *pipe_);
69 void write_activated (zmq::pipe_t *pipe_);
70 void hiccuped (zmq::pipe_t *pipe_);
71 void pipe_terminated (zmq::pipe_t *pipe_);
72
73 // Delivers a message. Returns 0 if successful; -1 otherwise.
74 // The function takes ownership of the message.
75 virtual int push_msg (msg_t *msg_);
76
77 int zap_connect ();
78 bool zap_enabled ();
79
80 // Fetches a message. Returns 0 if successful; -1 otherwise.
81 // The caller is responsible for freeing the message when no
82 // longer used.
83 virtual int pull_msg (msg_t *msg_);
84
85 // Receives message from ZAP socket.
86 // Returns 0 on success; -1 otherwise.
87 // The caller is responsible for freeing the message.
88 int read_zap_msg (msg_t *msg_);
89
90 // Sends message to ZAP socket.
91 // Returns 0 on success; -1 otherwise.
92 // The function takes ownership of the message.
93 int write_zap_msg (msg_t *msg_);
94
95 socket_base_t *get_socket ();
96 const endpoint_uri_pair_t &get_endpoint () const;
97
98 protected:
99 session_base_t (zmq::io_thread_t *io_thread_,
100 bool active_,
101 zmq::socket_base_t *socket_,
102 const options_t &options_,
103 address_t *addr_);
104 virtual ~session_base_t ();
105
106 private:
107 void start_connecting (bool wait_);
108
109 typedef own_t *(session_base_t::*connecter_factory_fun_t) (
110 io_thread_t *io_thread, bool wait_);
111 typedef std::pair<const std::string, connecter_factory_fun_t>
112 connecter_factory_entry_t;
113 static connecter_factory_entry_t _connecter_factories[];
114 typedef std::map<std::string, connecter_factory_fun_t>
115 connecter_factory_map_t;
116 static connecter_factory_map_t _connecter_factories_map;
117
118 own_t *create_connecter_vmci (io_thread_t *io_thread_, bool wait_);
119 own_t *create_connecter_tipc (io_thread_t *io_thread_, bool wait_);
120 own_t *create_connecter_ipc (io_thread_t *io_thread_, bool wait_);
121 own_t *create_connecter_tcp (io_thread_t *io_thread_, bool wait_);
122 own_t *create_connecter_ws (io_thread_t *io_thread_, bool wait_);
123 own_t *create_connecter_wss (io_thread_t *io_thread_, bool wait_);
124
125 typedef void (session_base_t::*start_connecting_fun_t) (
126 io_thread_t *io_thread);
127 typedef std::pair<const std::string, start_connecting_fun_t>
128 start_connecting_entry_t;
129 static start_connecting_entry_t _start_connecting_entries[];
130 typedef std::map<std::string, start_connecting_fun_t>
131 start_connecting_map_t;
132 static start_connecting_map_t _start_connecting_map;
133
134 void start_connecting_pgm (io_thread_t *io_thread_);
135 void start_connecting_norm (io_thread_t *io_thread_);
136 void start_connecting_udp (io_thread_t *io_thread_);
137
138 void reconnect ();
139
140 // Handlers for incoming commands.
141 void process_plug ();
142 void process_attach (zmq::i_engine *engine_);
143 void process_term (int linger_);
144
145 // i_poll_events handlers.
146 void timer_event (int id_);
147
148 // Remove any half processed messages. Flush unflushed messages.
149 // Call this function when engine disconnect to get rid of leftovers.
150 void clean_pipes ();
151
152 // If true, this session (re)connects to the peer. Otherwise, it's
153 // a transient session created by the listener.
154 const bool _active;
155
156 // Pipe connecting the session to its socket.
157 zmq::pipe_t *_pipe;
158
159 // Pipe used to exchange messages with ZAP socket.
160 zmq::pipe_t *_zap_pipe;
161
162 // This set is added to with pipes we are disconnecting, but haven't yet completed
163 std::set<pipe_t *> _terminating_pipes;
164
165 // This flag is true if the remainder of the message being processed
166 // is still in the in pipe.
167 bool _incomplete_in;
168
169 // True if termination have been suspended to push the pending
170 // messages to the network.
171 bool _pending;
172
173 // The protocol I/O engine connected to the session.
174 zmq::i_engine *_engine;
175
176 // The socket the session belongs to.
177 zmq::socket_base_t *_socket;
178
179 // I/O thread the session is living in. It will be used to plug in
180 // the engines into the same thread.
181 zmq::io_thread_t *_io_thread;
182
183 // ID of the linger timer
184 enum
185 {
186 linger_timer_id = 0x20
187 };
188
189 // True is linger timer is running.
190 bool _has_linger_timer;
191
192 // Protocol and address to use when connecting.
193 address_t *_addr;
194
195 // TLS handshake, we need to take a copy when the session is created,
196 // in order to maintain the value at the creation time
197 char *_wss_hostname;
198
199 ZMQ_NON_COPYABLE_NOR_MOVABLE (session_base_t)
200};
201}
202
203#endif
204