| 1 | |
| 2 | #ifndef __ZMQ_NORM_ENGINE_HPP_INCLUDED__ |
| 3 | #define __ZMQ_NORM_ENGINE_HPP_INCLUDED__ |
| 4 | |
| 5 | #if defined ZMQ_HAVE_NORM |
| 6 | |
| 7 | #include "io_object.hpp" |
| 8 | #include "i_engine.hpp" |
| 9 | #include "options.hpp" |
| 10 | #include "v2_decoder.hpp" |
| 11 | #include "v2_encoder.hpp" |
| 12 | |
| 13 | #include <normApi.h> |
| 14 | |
| 15 | namespace zmq |
| 16 | { |
| 17 | class io_thread_t; |
| 18 | class msg_t; |
| 19 | class session_base_t; |
| 20 | |
| 21 | class norm_engine_t : public io_object_t, public i_engine |
| 22 | { |
| 23 | public: |
| 24 | norm_engine_t (zmq::io_thread_t *parent_, const options_t &options_); |
| 25 | ~norm_engine_t (); |
| 26 | |
| 27 | // create NORM instance, session, etc |
| 28 | int init (const char *network_, bool send, bool recv); |
| 29 | void shutdown (); |
| 30 | |
| 31 | // i_engine interface implementation. |
| 32 | // Plug the engine to the session. |
| 33 | virtual void plug (zmq::io_thread_t *io_thread_, |
| 34 | class session_base_t *session_); |
| 35 | |
| 36 | // Terminate and deallocate the engine. Note that 'detached' |
| 37 | // events are not fired on termination. |
| 38 | virtual void terminate (); |
| 39 | |
| 40 | // This method is called by the session to signalise that more |
| 41 | // messages can be written to the pipe. |
| 42 | virtual bool restart_input (); |
| 43 | |
| 44 | // This method is called by the session to signalise that there |
| 45 | // are messages to send available. |
| 46 | virtual void restart_output (); |
| 47 | |
| 48 | virtual void zap_msg_available (){}; |
| 49 | |
| 50 | virtual const endpoint_uri_pair_t &get_endpoint () const; |
| 51 | |
| 52 | // i_poll_events interface implementation. |
| 53 | // (we only need in_event() for NormEvent notification) |
| 54 | // (i.e., don't have any output events or timers (yet)) |
| 55 | void in_event (); |
| 56 | |
| 57 | private: |
| 58 | void unplug (); |
| 59 | void send_data (); |
| 60 | void recv_data (NormObjectHandle stream); |
| 61 | |
| 62 | |
| 63 | enum |
| 64 | { |
| 65 | BUFFER_SIZE = 2048 |
| 66 | }; |
| 67 | |
| 68 | // Used to keep track of streams from multiple senders |
| 69 | class NormRxStreamState |
| 70 | { |
| 71 | public: |
| 72 | NormRxStreamState (NormObjectHandle normStream, |
| 73 | int64_t maxMsgSize, |
| 74 | bool zeroCopy, |
| 75 | int inBatchSize); |
| 76 | ~NormRxStreamState (); |
| 77 | |
| 78 | NormObjectHandle GetStreamHandle () const { return norm_stream; } |
| 79 | |
| 80 | bool Init (); |
| 81 | |
| 82 | void SetRxReady (bool state) { rx_ready = state; } |
| 83 | bool IsRxReady () const { return rx_ready; } |
| 84 | |
| 85 | void SetSync (bool state) { in_sync = state; } |
| 86 | bool InSync () const { return in_sync; } |
| 87 | |
| 88 | // These are used to feed data to decoder |
| 89 | // and its underlying "msg" buffer |
| 90 | char *AccessBuffer () { return (char *) (buffer_ptr + buffer_count); } |
| 91 | size_t GetBytesNeeded () const { return buffer_size - buffer_count; } |
| 92 | void IncrementBufferCount (size_t count) { buffer_count += count; } |
| 93 | msg_t *AccessMsg () { return zmq_decoder->msg (); } |
| 94 | // This invokes the decoder "decode" method |
| 95 | // returning 0 if more data is needed, |
| 96 | // 1 if the message is complete, If an error |
| 97 | // occurs the 'sync' is dropped and the |
| 98 | // decoder re-initialized |
| 99 | int Decode (); |
| 100 | |
| 101 | class List |
| 102 | { |
| 103 | public: |
| 104 | List (); |
| 105 | ~List (); |
| 106 | |
| 107 | void Append (NormRxStreamState &item); |
| 108 | void Remove (NormRxStreamState &item); |
| 109 | |
| 110 | bool IsEmpty () const { return NULL == head; } |
| 111 | |
| 112 | void Destroy (); |
| 113 | |
| 114 | class Iterator |
| 115 | { |
| 116 | public: |
| 117 | Iterator (const List &list); |
| 118 | NormRxStreamState *GetNextItem (); |
| 119 | |
| 120 | private: |
| 121 | NormRxStreamState *next_item; |
| 122 | }; |
| 123 | friend class Iterator; |
| 124 | |
| 125 | private: |
| 126 | NormRxStreamState *head; |
| 127 | NormRxStreamState *tail; |
| 128 | |
| 129 | }; // end class zmq::norm_engine_t::NormRxStreamState::List |
| 130 | |
| 131 | friend class List; |
| 132 | |
| 133 | List *AccessList () { return list; } |
| 134 | |
| 135 | |
| 136 | private: |
| 137 | NormObjectHandle norm_stream; |
| 138 | int64_t max_msg_size; |
| 139 | bool zero_copy; |
| 140 | int in_batch_size; |
| 141 | bool in_sync; |
| 142 | bool rx_ready; |
| 143 | v2_decoder_t *zmq_decoder; |
| 144 | bool skip_norm_sync; |
| 145 | unsigned char *buffer_ptr; |
| 146 | size_t buffer_size; |
| 147 | size_t buffer_count; |
| 148 | |
| 149 | NormRxStreamState *prev; |
| 150 | NormRxStreamState *next; |
| 151 | NormRxStreamState::List *list; |
| 152 | |
| 153 | }; // end class zmq::norm_engine_t::NormRxStreamState |
| 154 | |
| 155 | const endpoint_uri_pair_t _empty_endpoint; |
| 156 | |
| 157 | session_base_t *zmq_session; |
| 158 | options_t options; |
| 159 | NormInstanceHandle norm_instance; |
| 160 | handle_t norm_descriptor_handle; |
| 161 | NormSessionHandle norm_session; |
| 162 | bool is_sender; |
| 163 | bool is_receiver; |
| 164 | // Sender state |
| 165 | msg_t tx_msg; |
| 166 | v2_encoder_t zmq_encoder; // for tx messages (we use v2 for now) |
| 167 | NormObjectHandle norm_tx_stream; |
| 168 | bool tx_first_msg; |
| 169 | bool tx_more_bit; |
| 170 | bool zmq_output_ready; // zmq has msg(s) to send |
| 171 | bool norm_tx_ready; // norm has tx queue vacancy |
| 172 | // TBD - maybe don't need buffer if can access zmq message buffer directly? |
| 173 | char tx_buffer[BUFFER_SIZE]; |
| 174 | unsigned int tx_index; |
| 175 | unsigned int tx_len; |
| 176 | |
| 177 | // Receiver state |
| 178 | // Lists of norm rx streams from remote senders |
| 179 | bool zmq_input_ready; // zmq ready to receive msg(s) |
| 180 | NormRxStreamState::List |
| 181 | rx_pending_list; // rx streams waiting for data reception |
| 182 | NormRxStreamState::List |
| 183 | rx_ready_list; // rx streams ready for NormStreamRead() |
| 184 | NormRxStreamState::List |
| 185 | msg_ready_list; // rx streams w/ msg ready for push to zmq |
| 186 | |
| 187 | |
| 188 | }; // end class norm_engine_t |
| 189 | } |
| 190 | |
| 191 | #endif // ZMQ_HAVE_NORM |
| 192 | |
| 193 | #endif // !__ZMQ_NORM_ENGINE_HPP_INCLUDED__ |
| 194 | |