1 | |
2 | #ifndef __ZMQ_NORM_ENGINE_HPP_INCLUDED__ |
3 | #define __ZMQ_NORM_ENGINE_HPP_INCLUDED__ |
4 | |
5 | #if defined ZMQ_HAVE_NORM |
6 | |
7 | #include "io_object.hpp" |
8 | #include "i_engine.hpp" |
9 | #include "options.hpp" |
10 | #include "v2_decoder.hpp" |
11 | #include "v2_encoder.hpp" |
12 | |
13 | #include <normApi.h> |
14 | |
15 | namespace zmq |
16 | { |
17 | class io_thread_t; |
18 | class msg_t; |
19 | class session_base_t; |
20 | |
21 | class norm_engine_t : public io_object_t, public i_engine |
22 | { |
23 | public: |
24 | norm_engine_t (zmq::io_thread_t *parent_, const options_t &options_); |
25 | ~norm_engine_t (); |
26 | |
27 | // create NORM instance, session, etc |
28 | int init (const char *network_, bool send, bool recv); |
29 | void shutdown (); |
30 | |
31 | // i_engine interface implementation. |
32 | // Plug the engine to the session. |
33 | virtual void plug (zmq::io_thread_t *io_thread_, |
34 | class session_base_t *session_); |
35 | |
36 | // Terminate and deallocate the engine. Note that 'detached' |
37 | // events are not fired on termination. |
38 | virtual void terminate (); |
39 | |
40 | // This method is called by the session to signalise that more |
41 | // messages can be written to the pipe. |
42 | virtual bool restart_input (); |
43 | |
44 | // This method is called by the session to signalise that there |
45 | // are messages to send available. |
46 | virtual void restart_output (); |
47 | |
48 | virtual void zap_msg_available (){}; |
49 | |
50 | virtual const endpoint_uri_pair_t &get_endpoint () const; |
51 | |
52 | // i_poll_events interface implementation. |
53 | // (we only need in_event() for NormEvent notification) |
54 | // (i.e., don't have any output events or timers (yet)) |
55 | void in_event (); |
56 | |
57 | private: |
58 | void unplug (); |
59 | void send_data (); |
60 | void recv_data (NormObjectHandle stream); |
61 | |
62 | |
63 | enum |
64 | { |
65 | BUFFER_SIZE = 2048 |
66 | }; |
67 | |
68 | // Used to keep track of streams from multiple senders |
69 | class NormRxStreamState |
70 | { |
71 | public: |
72 | NormRxStreamState (NormObjectHandle normStream, |
73 | int64_t maxMsgSize, |
74 | bool zeroCopy, |
75 | int inBatchSize); |
76 | ~NormRxStreamState (); |
77 | |
78 | NormObjectHandle GetStreamHandle () const { return norm_stream; } |
79 | |
80 | bool Init (); |
81 | |
82 | void SetRxReady (bool state) { rx_ready = state; } |
83 | bool IsRxReady () const { return rx_ready; } |
84 | |
85 | void SetSync (bool state) { in_sync = state; } |
86 | bool InSync () const { return in_sync; } |
87 | |
88 | // These are used to feed data to decoder |
89 | // and its underlying "msg" buffer |
90 | char *AccessBuffer () { return (char *) (buffer_ptr + buffer_count); } |
91 | size_t GetBytesNeeded () const { return buffer_size - buffer_count; } |
92 | void IncrementBufferCount (size_t count) { buffer_count += count; } |
93 | msg_t *AccessMsg () { return zmq_decoder->msg (); } |
94 | // This invokes the decoder "decode" method |
95 | // returning 0 if more data is needed, |
96 | // 1 if the message is complete, If an error |
97 | // occurs the 'sync' is dropped and the |
98 | // decoder re-initialized |
99 | int Decode (); |
100 | |
101 | class List |
102 | { |
103 | public: |
104 | List (); |
105 | ~List (); |
106 | |
107 | void Append (NormRxStreamState &item); |
108 | void Remove (NormRxStreamState &item); |
109 | |
110 | bool IsEmpty () const { return NULL == head; } |
111 | |
112 | void Destroy (); |
113 | |
114 | class Iterator |
115 | { |
116 | public: |
117 | Iterator (const List &list); |
118 | NormRxStreamState *GetNextItem (); |
119 | |
120 | private: |
121 | NormRxStreamState *next_item; |
122 | }; |
123 | friend class Iterator; |
124 | |
125 | private: |
126 | NormRxStreamState *head; |
127 | NormRxStreamState *tail; |
128 | |
129 | }; // end class zmq::norm_engine_t::NormRxStreamState::List |
130 | |
131 | friend class List; |
132 | |
133 | List *AccessList () { return list; } |
134 | |
135 | |
136 | private: |
137 | NormObjectHandle norm_stream; |
138 | int64_t max_msg_size; |
139 | bool zero_copy; |
140 | int in_batch_size; |
141 | bool in_sync; |
142 | bool rx_ready; |
143 | v2_decoder_t *zmq_decoder; |
144 | bool skip_norm_sync; |
145 | unsigned char *buffer_ptr; |
146 | size_t buffer_size; |
147 | size_t buffer_count; |
148 | |
149 | NormRxStreamState *prev; |
150 | NormRxStreamState *next; |
151 | NormRxStreamState::List *list; |
152 | |
153 | }; // end class zmq::norm_engine_t::NormRxStreamState |
154 | |
155 | const endpoint_uri_pair_t _empty_endpoint; |
156 | |
157 | session_base_t *zmq_session; |
158 | options_t options; |
159 | NormInstanceHandle norm_instance; |
160 | handle_t norm_descriptor_handle; |
161 | NormSessionHandle norm_session; |
162 | bool is_sender; |
163 | bool is_receiver; |
164 | // Sender state |
165 | msg_t tx_msg; |
166 | v2_encoder_t zmq_encoder; // for tx messages (we use v2 for now) |
167 | NormObjectHandle norm_tx_stream; |
168 | bool tx_first_msg; |
169 | bool tx_more_bit; |
170 | bool zmq_output_ready; // zmq has msg(s) to send |
171 | bool norm_tx_ready; // norm has tx queue vacancy |
172 | // TBD - maybe don't need buffer if can access zmq message buffer directly? |
173 | char tx_buffer[BUFFER_SIZE]; |
174 | unsigned int tx_index; |
175 | unsigned int tx_len; |
176 | |
177 | // Receiver state |
178 | // Lists of norm rx streams from remote senders |
179 | bool zmq_input_ready; // zmq ready to receive msg(s) |
180 | NormRxStreamState::List |
181 | rx_pending_list; // rx streams waiting for data reception |
182 | NormRxStreamState::List |
183 | rx_ready_list; // rx streams ready for NormStreamRead() |
184 | NormRxStreamState::List |
185 | msg_ready_list; // rx streams w/ msg ready for push to zmq |
186 | |
187 | |
188 | }; // end class norm_engine_t |
189 | } |
190 | |
191 | #endif // ZMQ_HAVE_NORM |
192 | |
193 | #endif // !__ZMQ_NORM_ENGINE_HPP_INCLUDED__ |
194 | |