1
2#ifndef __ZMQ_NORM_ENGINE_HPP_INCLUDED__
3#define __ZMQ_NORM_ENGINE_HPP_INCLUDED__
4
5#if defined ZMQ_HAVE_NORM
6
7#include "io_object.hpp"
8#include "i_engine.hpp"
9#include "options.hpp"
10#include "v2_decoder.hpp"
11#include "v2_encoder.hpp"
12
13#include <normApi.h>
14
15namespace zmq
16{
17class io_thread_t;
18class msg_t;
19class session_base_t;
20
21class norm_engine_t : public io_object_t, public i_engine
22{
23 public:
24 norm_engine_t (zmq::io_thread_t *parent_, const options_t &options_);
25 ~norm_engine_t ();
26
27 // create NORM instance, session, etc
28 int init (const char *network_, bool send, bool recv);
29 void shutdown ();
30
31 // i_engine interface implementation.
32 // Plug the engine to the session.
33 virtual void plug (zmq::io_thread_t *io_thread_,
34 class session_base_t *session_);
35
36 // Terminate and deallocate the engine. Note that 'detached'
37 // events are not fired on termination.
38 virtual void terminate ();
39
40 // This method is called by the session to signalise that more
41 // messages can be written to the pipe.
42 virtual bool restart_input ();
43
44 // This method is called by the session to signalise that there
45 // are messages to send available.
46 virtual void restart_output ();
47
48 virtual void zap_msg_available (){};
49
50 virtual const endpoint_uri_pair_t &get_endpoint () const;
51
52 // i_poll_events interface implementation.
53 // (we only need in_event() for NormEvent notification)
54 // (i.e., don't have any output events or timers (yet))
55 void in_event ();
56
57 private:
58 void unplug ();
59 void send_data ();
60 void recv_data (NormObjectHandle stream);
61
62
63 enum
64 {
65 BUFFER_SIZE = 2048
66 };
67
68 // Used to keep track of streams from multiple senders
69 class NormRxStreamState
70 {
71 public:
72 NormRxStreamState (NormObjectHandle normStream,
73 int64_t maxMsgSize,
74 bool zeroCopy,
75 int inBatchSize);
76 ~NormRxStreamState ();
77
78 NormObjectHandle GetStreamHandle () const { return norm_stream; }
79
80 bool Init ();
81
82 void SetRxReady (bool state) { rx_ready = state; }
83 bool IsRxReady () const { return rx_ready; }
84
85 void SetSync (bool state) { in_sync = state; }
86 bool InSync () const { return in_sync; }
87
88 // These are used to feed data to decoder
89 // and its underlying "msg" buffer
90 char *AccessBuffer () { return (char *) (buffer_ptr + buffer_count); }
91 size_t GetBytesNeeded () const { return buffer_size - buffer_count; }
92 void IncrementBufferCount (size_t count) { buffer_count += count; }
93 msg_t *AccessMsg () { return zmq_decoder->msg (); }
94 // This invokes the decoder "decode" method
95 // returning 0 if more data is needed,
96 // 1 if the message is complete, If an error
97 // occurs the 'sync' is dropped and the
98 // decoder re-initialized
99 int Decode ();
100
101 class List
102 {
103 public:
104 List ();
105 ~List ();
106
107 void Append (NormRxStreamState &item);
108 void Remove (NormRxStreamState &item);
109
110 bool IsEmpty () const { return NULL == head; }
111
112 void Destroy ();
113
114 class Iterator
115 {
116 public:
117 Iterator (const List &list);
118 NormRxStreamState *GetNextItem ();
119
120 private:
121 NormRxStreamState *next_item;
122 };
123 friend class Iterator;
124
125 private:
126 NormRxStreamState *head;
127 NormRxStreamState *tail;
128
129 }; // end class zmq::norm_engine_t::NormRxStreamState::List
130
131 friend class List;
132
133 List *AccessList () { return list; }
134
135
136 private:
137 NormObjectHandle norm_stream;
138 int64_t max_msg_size;
139 bool zero_copy;
140 int in_batch_size;
141 bool in_sync;
142 bool rx_ready;
143 v2_decoder_t *zmq_decoder;
144 bool skip_norm_sync;
145 unsigned char *buffer_ptr;
146 size_t buffer_size;
147 size_t buffer_count;
148
149 NormRxStreamState *prev;
150 NormRxStreamState *next;
151 NormRxStreamState::List *list;
152
153 }; // end class zmq::norm_engine_t::NormRxStreamState
154
155 const endpoint_uri_pair_t _empty_endpoint;
156
157 session_base_t *zmq_session;
158 options_t options;
159 NormInstanceHandle norm_instance;
160 handle_t norm_descriptor_handle;
161 NormSessionHandle norm_session;
162 bool is_sender;
163 bool is_receiver;
164 // Sender state
165 msg_t tx_msg;
166 v2_encoder_t zmq_encoder; // for tx messages (we use v2 for now)
167 NormObjectHandle norm_tx_stream;
168 bool tx_first_msg;
169 bool tx_more_bit;
170 bool zmq_output_ready; // zmq has msg(s) to send
171 bool norm_tx_ready; // norm has tx queue vacancy
172 // TBD - maybe don't need buffer if can access zmq message buffer directly?
173 char tx_buffer[BUFFER_SIZE];
174 unsigned int tx_index;
175 unsigned int tx_len;
176
177 // Receiver state
178 // Lists of norm rx streams from remote senders
179 bool zmq_input_ready; // zmq ready to receive msg(s)
180 NormRxStreamState::List
181 rx_pending_list; // rx streams waiting for data reception
182 NormRxStreamState::List
183 rx_ready_list; // rx streams ready for NormStreamRead()
184 NormRxStreamState::List
185 msg_ready_list; // rx streams w/ msg ready for push to zmq
186
187
188}; // end class norm_engine_t
189}
190
191#endif // ZMQ_HAVE_NORM
192
193#endif // !__ZMQ_NORM_ENGINE_HPP_INCLUDED__
194