1
2#include "precompiled.hpp"
3
4#include "platform.hpp"
5
6#if defined ZMQ_HAVE_NORM
7
8#include "norm_engine.hpp"
9#include "session_base.hpp"
10#include "v2_protocol.hpp"
11
12zmq::norm_engine_t::norm_engine_t (io_thread_t *parent_,
13 const options_t &options_) :
14 io_object_t (parent_),
15 zmq_session (NULL),
16 options (options_),
17 norm_instance (NORM_INSTANCE_INVALID),
18 norm_session (NORM_SESSION_INVALID),
19 is_sender (false),
20 is_receiver (false),
21 zmq_encoder (0),
22 norm_tx_stream (NORM_OBJECT_INVALID),
23 tx_first_msg (true),
24 tx_more_bit (false),
25 zmq_output_ready (false),
26 norm_tx_ready (false),
27 tx_index (0),
28 tx_len (0),
29 zmq_input_ready (false)
30{
31 int rc = tx_msg.init ();
32 errno_assert (0 == rc);
33}
34
35zmq::norm_engine_t::~norm_engine_t ()
36{
37 shutdown (); // in case it was not already called
38}
39
40
41int zmq::norm_engine_t::init (const char *network_, bool send, bool recv)
42{
43 // Parse the "network_" address int "iface", "addr", and "port"
44 // norm endpoint format: [id,][<iface>;]<addr>:<port>
45 // First, look for optional local NormNodeId
46 // (default NORM_NODE_ANY causes NORM to use host IP addr for NormNodeId)
47 NormNodeId localId = NORM_NODE_ANY;
48 const char *ifacePtr = strchr (network_, ',');
49 if (NULL != ifacePtr) {
50 size_t idLen = ifacePtr - network_;
51 if (idLen > 31)
52 idLen = 31;
53 char idText[32];
54 strncpy (idText, network_, idLen);
55 idText[idLen] = '\0';
56 localId = (NormNodeId) atoi (idText);
57 ifacePtr++;
58 } else {
59 ifacePtr = network_;
60 }
61
62 // Second, look for optional multicast ifaceName
63 char ifaceName[256];
64 const char *addrPtr = strchr (ifacePtr, ';');
65 if (NULL != addrPtr) {
66 size_t ifaceLen = addrPtr - ifacePtr;
67 if (ifaceLen > 255)
68 ifaceLen = 255; // return error instead?
69 strncpy (ifaceName, ifacePtr, ifaceLen);
70 ifaceName[ifaceLen] = '\0';
71 ifacePtr = ifaceName;
72 addrPtr++;
73 } else {
74 addrPtr = ifacePtr;
75 ifacePtr = NULL;
76 }
77
78 // Finally, parse IP address and port number
79 const char *portPtr = strrchr (addrPtr, ':');
80 if (NULL == portPtr) {
81 errno = EINVAL;
82 return -1;
83 }
84
85 char addr[256];
86 size_t addrLen = portPtr - addrPtr;
87 if (addrLen > 255)
88 addrLen = 255;
89 strncpy (addr, addrPtr, addrLen);
90 addr[addrLen] = '\0';
91 portPtr++;
92 unsigned short portNumber = atoi (portPtr);
93
94 if (NORM_INSTANCE_INVALID == norm_instance) {
95 if (NORM_INSTANCE_INVALID == (norm_instance = NormCreateInstance ())) {
96 // errno set by whatever caused NormCreateInstance() to fail
97 return -1;
98 }
99 }
100
101 // TBD - What do we use for our local NormNodeId?
102 // (for now we use automatic, IP addr based assignment or passed in 'id')
103 // a) Use ZMQ Identity somehow?
104 // b) Add function to use iface addr
105 // c) Randomize and implement a NORM session layer
106 // conflict detection/resolution protocol
107
108 norm_session = NormCreateSession (norm_instance, addr, portNumber, localId);
109 if (NORM_SESSION_INVALID == norm_session) {
110 int savedErrno = errno;
111 NormDestroyInstance (norm_instance);
112 norm_instance = NORM_INSTANCE_INVALID;
113 errno = savedErrno;
114 return -1;
115 }
116 // There's many other useful NORM options that could be applied here
117 if (NormIsUnicastAddress (addr)) {
118 NormSetDefaultUnicastNack (norm_session, true);
119 } else {
120 // These only apply for multicast sessions
121 //NormSetTTL(norm_session, options.multicast_hops); // ZMQ default is 1
122 NormSetTTL (
123 norm_session,
124 255); // since the ZMQ_MULTICAST_HOPS socket option isn't well-supported
125 NormSetRxPortReuse (
126 norm_session,
127 true); // port reuse doesn't work for non-connected unicast
128 NormSetLoopback (norm_session,
129 true); // needed when multicast users on same machine
130 if (NULL != ifacePtr) {
131 // Note a bad interface may not be caught until sender or receiver start
132 // (Since sender/receiver is not yet started, this always succeeds here)
133 NormSetMulticastInterface (norm_session, ifacePtr);
134 }
135 }
136
137 if (recv) {
138 // The alternative NORM_SYNC_CURRENT here would provide "instant"
139 // receiver sync to the sender's _current_ message transmission.
140 // NORM_SYNC_STREAM tries to get everything the sender has cached/buffered
141 NormSetDefaultSyncPolicy (norm_session, NORM_SYNC_STREAM);
142 if (!NormStartReceiver (norm_session, 2 * 1024 * 1024)) {
143 // errno set by whatever failed
144 int savedErrno = errno;
145 NormDestroyInstance (norm_instance); // session gets closed, too
146 norm_session = NORM_SESSION_INVALID;
147 norm_instance = NORM_INSTANCE_INVALID;
148 errno = savedErrno;
149 return -1;
150 }
151 is_receiver = true;
152 }
153
154 if (send) {
155 // Pick a random sender instance id (aka norm sender session id)
156 NormSessionId instanceId = NormGetRandomSessionId ();
157 // TBD - provide "options" for some NORM sender parameters
158 if (!NormStartSender (norm_session, instanceId, 2 * 1024 * 1024, 1400,
159 16, 4)) {
160 // errno set by whatever failed
161 int savedErrno = errno;
162 NormDestroyInstance (norm_instance); // session gets closed, too
163 norm_session = NORM_SESSION_INVALID;
164 norm_instance = NORM_INSTANCE_INVALID;
165 errno = savedErrno;
166 return -1;
167 }
168 NormSetCongestionControl (norm_session, true);
169 norm_tx_ready = true;
170 is_sender = true;
171 if (NORM_OBJECT_INVALID
172 == (norm_tx_stream =
173 NormStreamOpen (norm_session, 2 * 1024 * 1024))) {
174 // errno set by whatever failed
175 int savedErrno = errno;
176 NormDestroyInstance (norm_instance); // session gets closed, too
177 norm_session = NORM_SESSION_INVALID;
178 norm_instance = NORM_INSTANCE_INVALID;
179 errno = savedErrno;
180 return -1;
181 }
182 }
183
184 //NormSetMessageTrace(norm_session, true);
185 //NormSetDebugLevel(3);
186 //NormOpenDebugLog(norm_instance, "normLog.txt");
187
188 return 0; // no error
189} // end zmq::norm_engine_t::init()
190
191void zmq::norm_engine_t::shutdown ()
192{
193 // TBD - implement a more graceful shutdown option
194 if (is_receiver) {
195 NormStopReceiver (norm_session);
196
197 // delete any active NormRxStreamState
198 rx_pending_list.Destroy ();
199 rx_ready_list.Destroy ();
200 msg_ready_list.Destroy ();
201
202 is_receiver = false;
203 }
204 if (is_sender) {
205 NormStopSender (norm_session);
206 is_sender = false;
207 }
208 if (NORM_SESSION_INVALID != norm_session) {
209 NormDestroySession (norm_session);
210 norm_session = NORM_SESSION_INVALID;
211 }
212 if (NORM_INSTANCE_INVALID != norm_instance) {
213 NormStopInstance (norm_instance);
214 NormDestroyInstance (norm_instance);
215 norm_instance = NORM_INSTANCE_INVALID;
216 }
217} // end zmq::norm_engine_t::shutdown()
218
219void zmq::norm_engine_t::plug (io_thread_t *io_thread_,
220 session_base_t *session_)
221{
222 // TBD - we may assign the NORM engine to an io_thread in the future???
223 zmq_session = session_;
224 if (is_sender)
225 zmq_output_ready = true;
226 if (is_receiver)
227 zmq_input_ready = true;
228
229 fd_t normDescriptor = NormGetDescriptor (norm_instance);
230 norm_descriptor_handle = add_fd (normDescriptor);
231 // Set POLLIN for notification of pending NormEvents
232 set_pollin (norm_descriptor_handle);
233
234 if (is_sender)
235 send_data ();
236
237} // end zmq::norm_engine_t::init()
238
239void zmq::norm_engine_t::unplug ()
240{
241 rm_fd (norm_descriptor_handle);
242
243 zmq_session = NULL;
244} // end zmq::norm_engine_t::unplug()
245
246void zmq::norm_engine_t::terminate ()
247{
248 unplug ();
249 shutdown ();
250 delete this;
251}
252
253void zmq::norm_engine_t::restart_output ()
254{
255 // There's new message data available from the session
256 zmq_output_ready = true;
257 if (norm_tx_ready)
258 send_data ();
259
260} // end zmq::norm_engine_t::restart_output()
261
262void zmq::norm_engine_t::send_data ()
263{
264 // Here we write as much as is available or we can
265 while (zmq_output_ready && norm_tx_ready) {
266 if (0 == tx_len) {
267 // Our tx_buffer needs data to send
268 // Get more data from encoder
269 size_t space = BUFFER_SIZE;
270 unsigned char *bufPtr = (unsigned char *) tx_buffer;
271 tx_len = zmq_encoder.encode (&bufPtr, space);
272 if (0 == tx_len) {
273 if (tx_first_msg) {
274 // We don't need to mark eom/flush until a message is sent
275 tx_first_msg = false;
276 } else {
277 // A prior message was completely written to stream, so
278 // mark end-of-message and possibly flush (to force packet transmission,
279 // even if it's not a full segment so message gets delivered quickly)
280 // NormStreamMarkEom(norm_tx_stream); // the flush below marks eom
281 // Note NORM_FLUSH_ACTIVE makes NORM fairly chatty for low duty cycle messaging
282 // but makes sure content is delivered quickly. Positive acknowledgements
283 // with flush override would make NORM more succinct here
284 NormStreamFlush (norm_tx_stream, true, NORM_FLUSH_ACTIVE);
285 }
286 // Need to pull and load a new message to send
287 if (-1 == zmq_session->pull_msg (&tx_msg)) {
288 // We need to wait for "restart_output()" to be called by ZMQ
289 zmq_output_ready = false;
290 break;
291 }
292 zmq_encoder.load_msg (&tx_msg);
293 // Should we write message size header for NORM to use? Or expect NORM
294 // receiver to decode ZMQ message framing format(s)?
295 // OK - we need to use a byte to denote when the ZMQ frame is the _first_
296 // frame of a message so it can be decoded properly when a receiver
297 // 'syncs' mid-stream. We key off the the state of the 'more_flag'
298 // I.e.,If more_flag _was_ false previously, this is the first
299 // frame of a ZMQ message.
300 if (tx_more_bit)
301 tx_buffer[0] =
302 (char) 0xff; // this is not first frame of message
303 else
304 tx_buffer[0] = 0x00; // this is first frame of message
305 tx_more_bit = (0 != (tx_msg.flags () & msg_t::more));
306 // Go ahead an get a first chunk of the message
307 bufPtr++;
308 space--;
309 tx_len = 1 + zmq_encoder.encode (&bufPtr, space);
310 tx_index = 0;
311 }
312 }
313 // Do we have data in our tx_buffer pending
314 if (tx_index < tx_len) {
315 // We have data in our tx_buffer to send, so write it to the stream
316 tx_index += NormStreamWrite (norm_tx_stream, tx_buffer + tx_index,
317 tx_len - tx_index);
318 if (tx_index < tx_len) {
319 // NORM stream buffer full, wait for NORM_TX_QUEUE_VACANCY
320 norm_tx_ready = false;
321 break;
322 }
323 tx_len = 0; // all buffered data was written
324 }
325 } // end while (zmq_output_ready && norm_tx_ready)
326} // end zmq::norm_engine_t::send_data()
327
328void zmq::norm_engine_t::in_event ()
329{
330 // This means a NormEvent is pending, so call NormGetNextEvent() and handle
331 NormEvent event;
332 if (!NormGetNextEvent (norm_instance, &event)) {
333 // NORM has died before we unplugged?!
334 zmq_assert (false);
335 return;
336 }
337
338 switch (event.type) {
339 case NORM_TX_QUEUE_VACANCY:
340 case NORM_TX_QUEUE_EMPTY:
341 if (!norm_tx_ready) {
342 norm_tx_ready = true;
343 send_data ();
344 }
345 break;
346
347 case NORM_RX_OBJECT_NEW:
348 //break;
349 case NORM_RX_OBJECT_UPDATED:
350 recv_data (event.object);
351 break;
352
353 case NORM_RX_OBJECT_ABORTED: {
354 NormRxStreamState *rxState =
355 (NormRxStreamState *) NormObjectGetUserData (event.object);
356 if (NULL != rxState) {
357 // Remove the state from the list it's in
358 // This is now unnecessary since deletion takes care of list removal
359 // but in the interest of being clear ...
360 NormRxStreamState::List *list = rxState->AccessList ();
361 if (NULL != list)
362 list->Remove (*rxState);
363 }
364 delete rxState;
365 break;
366 }
367 case NORM_REMOTE_SENDER_INACTIVE:
368 // Here we free resources used for this formerly active sender.
369 // Note w/ NORM_SYNC_STREAM, if sender reactivates, we may
370 // get some messages delivered twice. NORM_SYNC_CURRENT would
371 // mitigate that but might miss data at startup. Always tradeoffs.
372 // Instead of immediately deleting, we could instead initiate a
373 // user configurable timeout here to wait some amount of time
374 // after this event to declare the remote sender truly dead
375 // and delete its state???
376 NormNodeDelete (event.sender);
377 break;
378
379 default:
380 // We ignore some NORM events
381 break;
382 }
383} // zmq::norm_engine_t::in_event()
384
385bool zmq::norm_engine_t::restart_input ()
386{
387 // TBD - should we check/assert that zmq_input_ready was false???
388 zmq_input_ready = true;
389 // Process any pending received messages
390 if (!msg_ready_list.IsEmpty ())
391 recv_data (NORM_OBJECT_INVALID);
392
393 return true;
394} // end zmq::norm_engine_t::restart_input()
395
396void zmq::norm_engine_t::recv_data (NormObjectHandle object)
397{
398 if (NORM_OBJECT_INVALID != object) {
399 // Call result of NORM_RX_OBJECT_UPDATED notification
400 // This is a rx_ready indication for a new or existing rx stream
401 // First, determine if this is a stream we already know
402 zmq_assert (NORM_OBJECT_STREAM == NormObjectGetType (object));
403 // Since there can be multiple senders (publishers), we keep
404 // state for each separate rx stream.
405 NormRxStreamState *rxState =
406 (NormRxStreamState *) NormObjectGetUserData (object);
407 if (NULL == rxState) {
408 // This is a new stream, so create rxState with zmq decoder, etc
409 rxState = new (std::nothrow)
410 NormRxStreamState (object, options.maxmsgsize, options.zero_copy,
411 options.in_batch_size);
412 errno_assert (rxState);
413
414 if (!rxState->Init ()) {
415 errno_assert (false);
416 delete rxState;
417 return;
418 }
419 NormObjectSetUserData (object, rxState);
420 } else if (!rxState->IsRxReady ()) {
421 // Existing non-ready stream, so remove from pending
422 // list to be promoted to rx_ready_list ...
423 rx_pending_list.Remove (*rxState);
424 }
425 if (!rxState->IsRxReady ()) {
426 // TBD - prepend up front for immediate service?
427 rxState->SetRxReady (true);
428 rx_ready_list.Append (*rxState);
429 }
430 }
431 // This loop repeats until we've read all data available from "rx ready" inbound streams
432 // and pushed any accumulated messages we can up to the zmq session.
433 while (!rx_ready_list.IsEmpty ()
434 || (zmq_input_ready && !msg_ready_list.IsEmpty ())) {
435 // Iterate through our rx_ready streams, reading data into the decoder
436 // (This services incoming "rx ready" streams in a round-robin fashion)
437 NormRxStreamState::List::Iterator iterator (rx_ready_list);
438 NormRxStreamState *rxState;
439 while (NULL != (rxState = iterator.GetNextItem ())) {
440 switch (rxState->Decode ()) {
441 case 1: // msg completed
442 // Complete message decoded, move this stream to msg_ready_list
443 // to push the message up to the session below. Note the stream
444 // will be returned to the "rx_ready_list" after that's done
445 rx_ready_list.Remove (*rxState);
446 msg_ready_list.Append (*rxState);
447 continue;
448
449 case -1: // decoding error (shouldn't happen w/ NORM, but ...)
450 // We need to re-sync this stream (decoder buffer was reset)
451 rxState->SetSync (false);
452 break;
453
454 default: // 0 - need more data
455 break;
456 }
457 // Get more data from this stream
458 NormObjectHandle stream = rxState->GetStreamHandle ();
459 // First, make sure we're in sync ...
460 while (!rxState->InSync ()) {
461 // seek NORM message start
462 if (!NormStreamSeekMsgStart (stream)) {
463 // Need to wait for more data
464 break;
465 }
466 // read message 'flag' byte to see if this it's a 'final' frame
467 char syncFlag;
468 unsigned int numBytes = 1;
469 if (!NormStreamRead (stream, &syncFlag, &numBytes)) {
470 // broken stream (shouldn't happen after seek msg start?)
471 zmq_assert (false);
472 continue;
473 }
474 if (0 == numBytes) {
475 // This probably shouldn't happen either since we found msg start
476 // Need to wait for more data
477 break;
478 }
479 if (0 == syncFlag)
480 rxState->SetSync (true);
481 // else keep seeking ...
482 } // end while(!rxState->InSync())
483 if (!rxState->InSync ()) {
484 // Need more data for this stream, so remove from "rx ready"
485 // list and iterate to next "rx ready" stream
486 rxState->SetRxReady (false);
487 // Move from rx_ready_list to rx_pending_list
488 rx_ready_list.Remove (*rxState);
489 rx_pending_list.Append (*rxState);
490 continue;
491 }
492 // Now we're actually ready to read data from the NORM stream to the zmq_decoder
493 // the underlying zmq_decoder->get_buffer() call sets how much is needed.
494 unsigned int numBytes = rxState->GetBytesNeeded ();
495 if (!NormStreamRead (stream, rxState->AccessBuffer (), &numBytes)) {
496 // broken NORM stream, so re-sync
497 rxState->Init (); // TBD - check result
498 // This will retry syncing, and getting data from this stream
499 // since we don't increment the "it" iterator
500 continue;
501 }
502 rxState->IncrementBufferCount (numBytes);
503 if (0 == numBytes) {
504 // All the data available has been read
505 // Need to wait for NORM_RX_OBJECT_UPDATED for this stream
506 rxState->SetRxReady (false);
507 // Move from rx_ready_list to rx_pending_list
508 rx_ready_list.Remove (*rxState);
509 rx_pending_list.Append (*rxState);
510 }
511 } // end while(NULL != (rxState = iterator.GetNextItem()))
512
513 if (zmq_input_ready) {
514 // At this point, we've made a pass through the "rx_ready" stream list
515 // Now make a pass through the "msg_pending" list (if the zmq session
516 // ready for more input). This may possibly return streams back to
517 // the "rx ready" stream list after their pending message is handled
518 NormRxStreamState::List::Iterator iterator (msg_ready_list);
519 NormRxStreamState *rxState;
520 while (NULL != (rxState = iterator.GetNextItem ())) {
521 msg_t *msg = rxState->AccessMsg ();
522 int rc = zmq_session->push_msg (msg);
523 if (-1 == rc) {
524 if (EAGAIN == errno) {
525 // need to wait until session calls "restart_input()"
526 zmq_input_ready = false;
527 break;
528 } else {
529 // session rejected message?
530 // TBD - handle this better
531 zmq_assert (false);
532 }
533 }
534 // else message was accepted.
535 msg_ready_list.Remove (*rxState);
536 if (
537 rxState
538 ->IsRxReady ()) // Move back to "rx_ready" list to read more data
539 rx_ready_list.Append (*rxState);
540 else // Move back to "rx_pending" list until NORM_RX_OBJECT_UPDATED
541 msg_ready_list.Append (*rxState);
542 } // end while(NULL != (rxState = iterator.GetNextItem()))
543 } // end if (zmq_input_ready)
544 } // end while ((!rx_ready_list.empty() || (zmq_input_ready && !msg_ready_list.empty()))
545
546 // Alert zmq of the messages we have pushed up
547 zmq_session->flush ();
548
549} // end zmq::norm_engine_t::recv_data()
550
551zmq::norm_engine_t::NormRxStreamState::NormRxStreamState (
552 NormObjectHandle normStream,
553 int64_t maxMsgSize,
554 bool zeroCopy,
555 int inBatchSize) :
556 norm_stream (normStream),
557 max_msg_size (maxMsgSize),
558 zero_copy (zeroCopy),
559 in_batch_size (inBatchSize),
560 in_sync (false),
561 rx_ready (false),
562 zmq_decoder (NULL),
563 skip_norm_sync (false),
564 buffer_ptr (NULL),
565 buffer_size (0),
566 buffer_count (0),
567 prev (NULL),
568 next (NULL),
569 list (NULL)
570{
571}
572
573zmq::norm_engine_t::NormRxStreamState::~NormRxStreamState ()
574{
575 if (NULL != zmq_decoder) {
576 delete zmq_decoder;
577 zmq_decoder = NULL;
578 }
579 if (NULL != list) {
580 list->Remove (*this);
581 list = NULL;
582 }
583}
584
585bool zmq::norm_engine_t::NormRxStreamState::Init ()
586{
587 in_sync = false;
588 skip_norm_sync = false;
589 if (NULL != zmq_decoder)
590 delete zmq_decoder;
591 zmq_decoder =
592 new (std::nothrow) v2_decoder_t (in_batch_size, max_msg_size, zero_copy);
593 alloc_assert (zmq_decoder);
594 if (NULL != zmq_decoder) {
595 buffer_count = 0;
596 buffer_size = 0;
597 zmq_decoder->get_buffer (&buffer_ptr, &buffer_size);
598 return true;
599 } else {
600 return false;
601 }
602} // end zmq::norm_engine_t::NormRxStreamState::Init()
603
604// This decodes any pending data sitting in our stream decoder buffer
605// It returns 1 upon message completion, -1 on error, 1 on msg completion
606int zmq::norm_engine_t::NormRxStreamState::Decode ()
607{
608 // If we have pending bytes to decode, process those first
609 while (buffer_count > 0) {
610 // There's pending data for the decoder to decode
611 size_t processed = 0;
612
613 // This a bit of a kludgy approach used to weed
614 // out the NORM ZMQ message transport "syncFlag" byte
615 // from the ZMQ message stream being decoded (but it works!)
616 if (skip_norm_sync) {
617 buffer_ptr++;
618 buffer_count--;
619 skip_norm_sync = false;
620 }
621
622 int rc = zmq_decoder->decode (buffer_ptr, buffer_count, processed);
623 buffer_ptr += processed;
624 buffer_count -= processed;
625 switch (rc) {
626 case 1:
627 // msg completed
628 if (0 == buffer_count) {
629 buffer_size = 0;
630 zmq_decoder->get_buffer (&buffer_ptr, &buffer_size);
631 }
632 skip_norm_sync = true;
633 return 1;
634 case -1:
635 // decoder error (reset decoder and state variables)
636 in_sync = false;
637 skip_norm_sync = false; // will get consumed by norm sync check
638 Init ();
639 break;
640
641 case 0:
642 // need more data, keep decoding until buffer exhausted
643 break;
644 }
645 }
646 // Reset buffer pointer/count for next read
647 buffer_count = 0;
648 buffer_size = 0;
649 zmq_decoder->get_buffer (&buffer_ptr, &buffer_size);
650 return 0; // need more data
651
652} // end zmq::norm_engine_t::NormRxStreamState::Decode()
653
654zmq::norm_engine_t::NormRxStreamState::List::List () : head (NULL), tail (NULL)
655{
656}
657
658zmq::norm_engine_t::NormRxStreamState::List::~List ()
659{
660 Destroy ();
661}
662
663void zmq::norm_engine_t::NormRxStreamState::List::Destroy ()
664{
665 NormRxStreamState *item = head;
666 while (NULL != item) {
667 Remove (*item);
668 delete item;
669 item = head;
670 }
671} // end zmq::norm_engine_t::NormRxStreamState::List::Destroy()
672
673void zmq::norm_engine_t::NormRxStreamState::List::Append (
674 NormRxStreamState &item)
675{
676 item.prev = tail;
677 if (NULL != tail)
678 tail->next = &item;
679 else
680 head = &item;
681 item.next = NULL;
682 tail = &item;
683 item.list = this;
684} // end zmq::norm_engine_t::NormRxStreamState::List::Append()
685
686void zmq::norm_engine_t::NormRxStreamState::List::Remove (
687 NormRxStreamState &item)
688{
689 if (NULL != item.prev)
690 item.prev->next = item.next;
691 else
692 head = item.next;
693 if (NULL != item.next)
694 item.next->prev = item.prev;
695 else
696 tail = item.prev;
697 item.prev = item.next = NULL;
698 item.list = NULL;
699} // end zmq::norm_engine_t::NormRxStreamState::List::Remove()
700
701zmq::norm_engine_t::NormRxStreamState::List::Iterator::Iterator (
702 const List &list) :
703 next_item (list.head)
704{
705}
706
707zmq::norm_engine_t::NormRxStreamState *
708zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem ()
709{
710 NormRxStreamState *nextItem = next_item;
711 if (NULL != nextItem)
712 next_item = nextItem->next;
713 return nextItem;
714} // end zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem()
715
716const zmq::endpoint_uri_pair_t &zmq::norm_engine_t::get_endpoint () const
717{
718 return _empty_endpoint;
719}
720
721#endif // ZMQ_HAVE_NORM
722