1 | |
2 | #include "precompiled.hpp" |
3 | |
4 | #include "platform.hpp" |
5 | |
6 | #if defined ZMQ_HAVE_NORM |
7 | |
8 | #include "norm_engine.hpp" |
9 | #include "session_base.hpp" |
10 | #include "v2_protocol.hpp" |
11 | |
12 | zmq::norm_engine_t::norm_engine_t (io_thread_t *parent_, |
13 | const options_t &options_) : |
14 | io_object_t (parent_), |
15 | zmq_session (NULL), |
16 | options (options_), |
17 | norm_instance (NORM_INSTANCE_INVALID), |
18 | norm_session (NORM_SESSION_INVALID), |
19 | is_sender (false), |
20 | is_receiver (false), |
21 | zmq_encoder (0), |
22 | norm_tx_stream (NORM_OBJECT_INVALID), |
23 | tx_first_msg (true), |
24 | tx_more_bit (false), |
25 | zmq_output_ready (false), |
26 | norm_tx_ready (false), |
27 | tx_index (0), |
28 | tx_len (0), |
29 | zmq_input_ready (false) |
30 | { |
31 | int rc = tx_msg.init (); |
32 | errno_assert (0 == rc); |
33 | } |
34 | |
35 | zmq::norm_engine_t::~norm_engine_t () |
36 | { |
37 | shutdown (); // in case it was not already called |
38 | } |
39 | |
40 | |
41 | int zmq::norm_engine_t::init (const char *network_, bool send, bool recv) |
42 | { |
43 | // Parse the "network_" address int "iface", "addr", and "port" |
44 | // norm endpoint format: [id,][<iface>;]<addr>:<port> |
45 | // First, look for optional local NormNodeId |
46 | // (default NORM_NODE_ANY causes NORM to use host IP addr for NormNodeId) |
47 | NormNodeId localId = NORM_NODE_ANY; |
48 | const char *ifacePtr = strchr (network_, ','); |
49 | if (NULL != ifacePtr) { |
50 | size_t idLen = ifacePtr - network_; |
51 | if (idLen > 31) |
52 | idLen = 31; |
53 | char idText[32]; |
54 | strncpy (idText, network_, idLen); |
55 | idText[idLen] = '\0'; |
56 | localId = (NormNodeId) atoi (idText); |
57 | ifacePtr++; |
58 | } else { |
59 | ifacePtr = network_; |
60 | } |
61 | |
62 | // Second, look for optional multicast ifaceName |
63 | char ifaceName[256]; |
64 | const char *addrPtr = strchr (ifacePtr, ';'); |
65 | if (NULL != addrPtr) { |
66 | size_t ifaceLen = addrPtr - ifacePtr; |
67 | if (ifaceLen > 255) |
68 | ifaceLen = 255; // return error instead? |
69 | strncpy (ifaceName, ifacePtr, ifaceLen); |
70 | ifaceName[ifaceLen] = '\0'; |
71 | ifacePtr = ifaceName; |
72 | addrPtr++; |
73 | } else { |
74 | addrPtr = ifacePtr; |
75 | ifacePtr = NULL; |
76 | } |
77 | |
78 | // Finally, parse IP address and port number |
79 | const char *portPtr = strrchr (addrPtr, ':'); |
80 | if (NULL == portPtr) { |
81 | errno = EINVAL; |
82 | return -1; |
83 | } |
84 | |
85 | char addr[256]; |
86 | size_t addrLen = portPtr - addrPtr; |
87 | if (addrLen > 255) |
88 | addrLen = 255; |
89 | strncpy (addr, addrPtr, addrLen); |
90 | addr[addrLen] = '\0'; |
91 | portPtr++; |
92 | unsigned short portNumber = atoi (portPtr); |
93 | |
94 | if (NORM_INSTANCE_INVALID == norm_instance) { |
95 | if (NORM_INSTANCE_INVALID == (norm_instance = NormCreateInstance ())) { |
96 | // errno set by whatever caused NormCreateInstance() to fail |
97 | return -1; |
98 | } |
99 | } |
100 | |
101 | // TBD - What do we use for our local NormNodeId? |
102 | // (for now we use automatic, IP addr based assignment or passed in 'id') |
103 | // a) Use ZMQ Identity somehow? |
104 | // b) Add function to use iface addr |
105 | // c) Randomize and implement a NORM session layer |
106 | // conflict detection/resolution protocol |
107 | |
108 | norm_session = NormCreateSession (norm_instance, addr, portNumber, localId); |
109 | if (NORM_SESSION_INVALID == norm_session) { |
110 | int savedErrno = errno; |
111 | NormDestroyInstance (norm_instance); |
112 | norm_instance = NORM_INSTANCE_INVALID; |
113 | errno = savedErrno; |
114 | return -1; |
115 | } |
116 | // There's many other useful NORM options that could be applied here |
117 | if (NormIsUnicastAddress (addr)) { |
118 | NormSetDefaultUnicastNack (norm_session, true); |
119 | } else { |
120 | // These only apply for multicast sessions |
121 | //NormSetTTL(norm_session, options.multicast_hops); // ZMQ default is 1 |
122 | NormSetTTL ( |
123 | norm_session, |
124 | 255); // since the ZMQ_MULTICAST_HOPS socket option isn't well-supported |
125 | NormSetRxPortReuse ( |
126 | norm_session, |
127 | true); // port reuse doesn't work for non-connected unicast |
128 | NormSetLoopback (norm_session, |
129 | true); // needed when multicast users on same machine |
130 | if (NULL != ifacePtr) { |
131 | // Note a bad interface may not be caught until sender or receiver start |
132 | // (Since sender/receiver is not yet started, this always succeeds here) |
133 | NormSetMulticastInterface (norm_session, ifacePtr); |
134 | } |
135 | } |
136 | |
137 | if (recv) { |
138 | // The alternative NORM_SYNC_CURRENT here would provide "instant" |
139 | // receiver sync to the sender's _current_ message transmission. |
140 | // NORM_SYNC_STREAM tries to get everything the sender has cached/buffered |
141 | NormSetDefaultSyncPolicy (norm_session, NORM_SYNC_STREAM); |
142 | if (!NormStartReceiver (norm_session, 2 * 1024 * 1024)) { |
143 | // errno set by whatever failed |
144 | int savedErrno = errno; |
145 | NormDestroyInstance (norm_instance); // session gets closed, too |
146 | norm_session = NORM_SESSION_INVALID; |
147 | norm_instance = NORM_INSTANCE_INVALID; |
148 | errno = savedErrno; |
149 | return -1; |
150 | } |
151 | is_receiver = true; |
152 | } |
153 | |
154 | if (send) { |
155 | // Pick a random sender instance id (aka norm sender session id) |
156 | NormSessionId instanceId = NormGetRandomSessionId (); |
157 | // TBD - provide "options" for some NORM sender parameters |
158 | if (!NormStartSender (norm_session, instanceId, 2 * 1024 * 1024, 1400, |
159 | 16, 4)) { |
160 | // errno set by whatever failed |
161 | int savedErrno = errno; |
162 | NormDestroyInstance (norm_instance); // session gets closed, too |
163 | norm_session = NORM_SESSION_INVALID; |
164 | norm_instance = NORM_INSTANCE_INVALID; |
165 | errno = savedErrno; |
166 | return -1; |
167 | } |
168 | NormSetCongestionControl (norm_session, true); |
169 | norm_tx_ready = true; |
170 | is_sender = true; |
171 | if (NORM_OBJECT_INVALID |
172 | == (norm_tx_stream = |
173 | NormStreamOpen (norm_session, 2 * 1024 * 1024))) { |
174 | // errno set by whatever failed |
175 | int savedErrno = errno; |
176 | NormDestroyInstance (norm_instance); // session gets closed, too |
177 | norm_session = NORM_SESSION_INVALID; |
178 | norm_instance = NORM_INSTANCE_INVALID; |
179 | errno = savedErrno; |
180 | return -1; |
181 | } |
182 | } |
183 | |
184 | //NormSetMessageTrace(norm_session, true); |
185 | //NormSetDebugLevel(3); |
186 | //NormOpenDebugLog(norm_instance, "normLog.txt"); |
187 | |
188 | return 0; // no error |
189 | } // end zmq::norm_engine_t::init() |
190 | |
191 | void zmq::norm_engine_t::shutdown () |
192 | { |
193 | // TBD - implement a more graceful shutdown option |
194 | if (is_receiver) { |
195 | NormStopReceiver (norm_session); |
196 | |
197 | // delete any active NormRxStreamState |
198 | rx_pending_list.Destroy (); |
199 | rx_ready_list.Destroy (); |
200 | msg_ready_list.Destroy (); |
201 | |
202 | is_receiver = false; |
203 | } |
204 | if (is_sender) { |
205 | NormStopSender (norm_session); |
206 | is_sender = false; |
207 | } |
208 | if (NORM_SESSION_INVALID != norm_session) { |
209 | NormDestroySession (norm_session); |
210 | norm_session = NORM_SESSION_INVALID; |
211 | } |
212 | if (NORM_INSTANCE_INVALID != norm_instance) { |
213 | NormStopInstance (norm_instance); |
214 | NormDestroyInstance (norm_instance); |
215 | norm_instance = NORM_INSTANCE_INVALID; |
216 | } |
217 | } // end zmq::norm_engine_t::shutdown() |
218 | |
219 | void zmq::norm_engine_t::plug (io_thread_t *io_thread_, |
220 | session_base_t *session_) |
221 | { |
222 | // TBD - we may assign the NORM engine to an io_thread in the future??? |
223 | zmq_session = session_; |
224 | if (is_sender) |
225 | zmq_output_ready = true; |
226 | if (is_receiver) |
227 | zmq_input_ready = true; |
228 | |
229 | fd_t normDescriptor = NormGetDescriptor (norm_instance); |
230 | norm_descriptor_handle = add_fd (normDescriptor); |
231 | // Set POLLIN for notification of pending NormEvents |
232 | set_pollin (norm_descriptor_handle); |
233 | |
234 | if (is_sender) |
235 | send_data (); |
236 | |
237 | } // end zmq::norm_engine_t::init() |
238 | |
239 | void zmq::norm_engine_t::unplug () |
240 | { |
241 | rm_fd (norm_descriptor_handle); |
242 | |
243 | zmq_session = NULL; |
244 | } // end zmq::norm_engine_t::unplug() |
245 | |
246 | void zmq::norm_engine_t::terminate () |
247 | { |
248 | unplug (); |
249 | shutdown (); |
250 | delete this; |
251 | } |
252 | |
253 | void zmq::norm_engine_t::restart_output () |
254 | { |
255 | // There's new message data available from the session |
256 | zmq_output_ready = true; |
257 | if (norm_tx_ready) |
258 | send_data (); |
259 | |
260 | } // end zmq::norm_engine_t::restart_output() |
261 | |
262 | void zmq::norm_engine_t::send_data () |
263 | { |
264 | // Here we write as much as is available or we can |
265 | while (zmq_output_ready && norm_tx_ready) { |
266 | if (0 == tx_len) { |
267 | // Our tx_buffer needs data to send |
268 | // Get more data from encoder |
269 | size_t space = BUFFER_SIZE; |
270 | unsigned char *bufPtr = (unsigned char *) tx_buffer; |
271 | tx_len = zmq_encoder.encode (&bufPtr, space); |
272 | if (0 == tx_len) { |
273 | if (tx_first_msg) { |
274 | // We don't need to mark eom/flush until a message is sent |
275 | tx_first_msg = false; |
276 | } else { |
277 | // A prior message was completely written to stream, so |
278 | // mark end-of-message and possibly flush (to force packet transmission, |
279 | // even if it's not a full segment so message gets delivered quickly) |
280 | // NormStreamMarkEom(norm_tx_stream); // the flush below marks eom |
281 | // Note NORM_FLUSH_ACTIVE makes NORM fairly chatty for low duty cycle messaging |
282 | // but makes sure content is delivered quickly. Positive acknowledgements |
283 | // with flush override would make NORM more succinct here |
284 | NormStreamFlush (norm_tx_stream, true, NORM_FLUSH_ACTIVE); |
285 | } |
286 | // Need to pull and load a new message to send |
287 | if (-1 == zmq_session->pull_msg (&tx_msg)) { |
288 | // We need to wait for "restart_output()" to be called by ZMQ |
289 | zmq_output_ready = false; |
290 | break; |
291 | } |
292 | zmq_encoder.load_msg (&tx_msg); |
293 | // Should we write message size header for NORM to use? Or expect NORM |
294 | // receiver to decode ZMQ message framing format(s)? |
295 | // OK - we need to use a byte to denote when the ZMQ frame is the _first_ |
296 | // frame of a message so it can be decoded properly when a receiver |
297 | // 'syncs' mid-stream. We key off the the state of the 'more_flag' |
298 | // I.e.,If more_flag _was_ false previously, this is the first |
299 | // frame of a ZMQ message. |
300 | if (tx_more_bit) |
301 | tx_buffer[0] = |
302 | (char) 0xff; // this is not first frame of message |
303 | else |
304 | tx_buffer[0] = 0x00; // this is first frame of message |
305 | tx_more_bit = (0 != (tx_msg.flags () & msg_t::more)); |
306 | // Go ahead an get a first chunk of the message |
307 | bufPtr++; |
308 | space--; |
309 | tx_len = 1 + zmq_encoder.encode (&bufPtr, space); |
310 | tx_index = 0; |
311 | } |
312 | } |
313 | // Do we have data in our tx_buffer pending |
314 | if (tx_index < tx_len) { |
315 | // We have data in our tx_buffer to send, so write it to the stream |
316 | tx_index += NormStreamWrite (norm_tx_stream, tx_buffer + tx_index, |
317 | tx_len - tx_index); |
318 | if (tx_index < tx_len) { |
319 | // NORM stream buffer full, wait for NORM_TX_QUEUE_VACANCY |
320 | norm_tx_ready = false; |
321 | break; |
322 | } |
323 | tx_len = 0; // all buffered data was written |
324 | } |
325 | } // end while (zmq_output_ready && norm_tx_ready) |
326 | } // end zmq::norm_engine_t::send_data() |
327 | |
328 | void zmq::norm_engine_t::in_event () |
329 | { |
330 | // This means a NormEvent is pending, so call NormGetNextEvent() and handle |
331 | NormEvent event; |
332 | if (!NormGetNextEvent (norm_instance, &event)) { |
333 | // NORM has died before we unplugged?! |
334 | zmq_assert (false); |
335 | return; |
336 | } |
337 | |
338 | switch (event.type) { |
339 | case NORM_TX_QUEUE_VACANCY: |
340 | case NORM_TX_QUEUE_EMPTY: |
341 | if (!norm_tx_ready) { |
342 | norm_tx_ready = true; |
343 | send_data (); |
344 | } |
345 | break; |
346 | |
347 | case NORM_RX_OBJECT_NEW: |
348 | //break; |
349 | case NORM_RX_OBJECT_UPDATED: |
350 | recv_data (event.object); |
351 | break; |
352 | |
353 | case NORM_RX_OBJECT_ABORTED: { |
354 | NormRxStreamState *rxState = |
355 | (NormRxStreamState *) NormObjectGetUserData (event.object); |
356 | if (NULL != rxState) { |
357 | // Remove the state from the list it's in |
358 | // This is now unnecessary since deletion takes care of list removal |
359 | // but in the interest of being clear ... |
360 | NormRxStreamState::List *list = rxState->AccessList (); |
361 | if (NULL != list) |
362 | list->Remove (*rxState); |
363 | } |
364 | delete rxState; |
365 | break; |
366 | } |
367 | case NORM_REMOTE_SENDER_INACTIVE: |
368 | // Here we free resources used for this formerly active sender. |
369 | // Note w/ NORM_SYNC_STREAM, if sender reactivates, we may |
370 | // get some messages delivered twice. NORM_SYNC_CURRENT would |
371 | // mitigate that but might miss data at startup. Always tradeoffs. |
372 | // Instead of immediately deleting, we could instead initiate a |
373 | // user configurable timeout here to wait some amount of time |
374 | // after this event to declare the remote sender truly dead |
375 | // and delete its state??? |
376 | NormNodeDelete (event.sender); |
377 | break; |
378 | |
379 | default: |
380 | // We ignore some NORM events |
381 | break; |
382 | } |
383 | } // zmq::norm_engine_t::in_event() |
384 | |
385 | bool zmq::norm_engine_t::restart_input () |
386 | { |
387 | // TBD - should we check/assert that zmq_input_ready was false??? |
388 | zmq_input_ready = true; |
389 | // Process any pending received messages |
390 | if (!msg_ready_list.IsEmpty ()) |
391 | recv_data (NORM_OBJECT_INVALID); |
392 | |
393 | return true; |
394 | } // end zmq::norm_engine_t::restart_input() |
395 | |
396 | void zmq::norm_engine_t::recv_data (NormObjectHandle object) |
397 | { |
398 | if (NORM_OBJECT_INVALID != object) { |
399 | // Call result of NORM_RX_OBJECT_UPDATED notification |
400 | // This is a rx_ready indication for a new or existing rx stream |
401 | // First, determine if this is a stream we already know |
402 | zmq_assert (NORM_OBJECT_STREAM == NormObjectGetType (object)); |
403 | // Since there can be multiple senders (publishers), we keep |
404 | // state for each separate rx stream. |
405 | NormRxStreamState *rxState = |
406 | (NormRxStreamState *) NormObjectGetUserData (object); |
407 | if (NULL == rxState) { |
408 | // This is a new stream, so create rxState with zmq decoder, etc |
409 | rxState = new (std::nothrow) |
410 | NormRxStreamState (object, options.maxmsgsize, options.zero_copy, |
411 | options.in_batch_size); |
412 | errno_assert (rxState); |
413 | |
414 | if (!rxState->Init ()) { |
415 | errno_assert (false); |
416 | delete rxState; |
417 | return; |
418 | } |
419 | NormObjectSetUserData (object, rxState); |
420 | } else if (!rxState->IsRxReady ()) { |
421 | // Existing non-ready stream, so remove from pending |
422 | // list to be promoted to rx_ready_list ... |
423 | rx_pending_list.Remove (*rxState); |
424 | } |
425 | if (!rxState->IsRxReady ()) { |
426 | // TBD - prepend up front for immediate service? |
427 | rxState->SetRxReady (true); |
428 | rx_ready_list.Append (*rxState); |
429 | } |
430 | } |
431 | // This loop repeats until we've read all data available from "rx ready" inbound streams |
432 | // and pushed any accumulated messages we can up to the zmq session. |
433 | while (!rx_ready_list.IsEmpty () |
434 | || (zmq_input_ready && !msg_ready_list.IsEmpty ())) { |
435 | // Iterate through our rx_ready streams, reading data into the decoder |
436 | // (This services incoming "rx ready" streams in a round-robin fashion) |
437 | NormRxStreamState::List::Iterator iterator (rx_ready_list); |
438 | NormRxStreamState *rxState; |
439 | while (NULL != (rxState = iterator.GetNextItem ())) { |
440 | switch (rxState->Decode ()) { |
441 | case 1: // msg completed |
442 | // Complete message decoded, move this stream to msg_ready_list |
443 | // to push the message up to the session below. Note the stream |
444 | // will be returned to the "rx_ready_list" after that's done |
445 | rx_ready_list.Remove (*rxState); |
446 | msg_ready_list.Append (*rxState); |
447 | continue; |
448 | |
449 | case -1: // decoding error (shouldn't happen w/ NORM, but ...) |
450 | // We need to re-sync this stream (decoder buffer was reset) |
451 | rxState->SetSync (false); |
452 | break; |
453 | |
454 | default: // 0 - need more data |
455 | break; |
456 | } |
457 | // Get more data from this stream |
458 | NormObjectHandle stream = rxState->GetStreamHandle (); |
459 | // First, make sure we're in sync ... |
460 | while (!rxState->InSync ()) { |
461 | // seek NORM message start |
462 | if (!NormStreamSeekMsgStart (stream)) { |
463 | // Need to wait for more data |
464 | break; |
465 | } |
466 | // read message 'flag' byte to see if this it's a 'final' frame |
467 | char syncFlag; |
468 | unsigned int numBytes = 1; |
469 | if (!NormStreamRead (stream, &syncFlag, &numBytes)) { |
470 | // broken stream (shouldn't happen after seek msg start?) |
471 | zmq_assert (false); |
472 | continue; |
473 | } |
474 | if (0 == numBytes) { |
475 | // This probably shouldn't happen either since we found msg start |
476 | // Need to wait for more data |
477 | break; |
478 | } |
479 | if (0 == syncFlag) |
480 | rxState->SetSync (true); |
481 | // else keep seeking ... |
482 | } // end while(!rxState->InSync()) |
483 | if (!rxState->InSync ()) { |
484 | // Need more data for this stream, so remove from "rx ready" |
485 | // list and iterate to next "rx ready" stream |
486 | rxState->SetRxReady (false); |
487 | // Move from rx_ready_list to rx_pending_list |
488 | rx_ready_list.Remove (*rxState); |
489 | rx_pending_list.Append (*rxState); |
490 | continue; |
491 | } |
492 | // Now we're actually ready to read data from the NORM stream to the zmq_decoder |
493 | // the underlying zmq_decoder->get_buffer() call sets how much is needed. |
494 | unsigned int numBytes = rxState->GetBytesNeeded (); |
495 | if (!NormStreamRead (stream, rxState->AccessBuffer (), &numBytes)) { |
496 | // broken NORM stream, so re-sync |
497 | rxState->Init (); // TBD - check result |
498 | // This will retry syncing, and getting data from this stream |
499 | // since we don't increment the "it" iterator |
500 | continue; |
501 | } |
502 | rxState->IncrementBufferCount (numBytes); |
503 | if (0 == numBytes) { |
504 | // All the data available has been read |
505 | // Need to wait for NORM_RX_OBJECT_UPDATED for this stream |
506 | rxState->SetRxReady (false); |
507 | // Move from rx_ready_list to rx_pending_list |
508 | rx_ready_list.Remove (*rxState); |
509 | rx_pending_list.Append (*rxState); |
510 | } |
511 | } // end while(NULL != (rxState = iterator.GetNextItem())) |
512 | |
513 | if (zmq_input_ready) { |
514 | // At this point, we've made a pass through the "rx_ready" stream list |
515 | // Now make a pass through the "msg_pending" list (if the zmq session |
516 | // ready for more input). This may possibly return streams back to |
517 | // the "rx ready" stream list after their pending message is handled |
518 | NormRxStreamState::List::Iterator iterator (msg_ready_list); |
519 | NormRxStreamState *rxState; |
520 | while (NULL != (rxState = iterator.GetNextItem ())) { |
521 | msg_t *msg = rxState->AccessMsg (); |
522 | int rc = zmq_session->push_msg (msg); |
523 | if (-1 == rc) { |
524 | if (EAGAIN == errno) { |
525 | // need to wait until session calls "restart_input()" |
526 | zmq_input_ready = false; |
527 | break; |
528 | } else { |
529 | // session rejected message? |
530 | // TBD - handle this better |
531 | zmq_assert (false); |
532 | } |
533 | } |
534 | // else message was accepted. |
535 | msg_ready_list.Remove (*rxState); |
536 | if ( |
537 | rxState |
538 | ->IsRxReady ()) // Move back to "rx_ready" list to read more data |
539 | rx_ready_list.Append (*rxState); |
540 | else // Move back to "rx_pending" list until NORM_RX_OBJECT_UPDATED |
541 | msg_ready_list.Append (*rxState); |
542 | } // end while(NULL != (rxState = iterator.GetNextItem())) |
543 | } // end if (zmq_input_ready) |
544 | } // end while ((!rx_ready_list.empty() || (zmq_input_ready && !msg_ready_list.empty())) |
545 | |
546 | // Alert zmq of the messages we have pushed up |
547 | zmq_session->flush (); |
548 | |
549 | } // end zmq::norm_engine_t::recv_data() |
550 | |
551 | zmq::norm_engine_t::NormRxStreamState::NormRxStreamState ( |
552 | NormObjectHandle normStream, |
553 | int64_t maxMsgSize, |
554 | bool zeroCopy, |
555 | int inBatchSize) : |
556 | norm_stream (normStream), |
557 | max_msg_size (maxMsgSize), |
558 | zero_copy (zeroCopy), |
559 | in_batch_size (inBatchSize), |
560 | in_sync (false), |
561 | rx_ready (false), |
562 | zmq_decoder (NULL), |
563 | skip_norm_sync (false), |
564 | buffer_ptr (NULL), |
565 | buffer_size (0), |
566 | buffer_count (0), |
567 | prev (NULL), |
568 | next (NULL), |
569 | list (NULL) |
570 | { |
571 | } |
572 | |
573 | zmq::norm_engine_t::NormRxStreamState::~NormRxStreamState () |
574 | { |
575 | if (NULL != zmq_decoder) { |
576 | delete zmq_decoder; |
577 | zmq_decoder = NULL; |
578 | } |
579 | if (NULL != list) { |
580 | list->Remove (*this); |
581 | list = NULL; |
582 | } |
583 | } |
584 | |
585 | bool zmq::norm_engine_t::NormRxStreamState::Init () |
586 | { |
587 | in_sync = false; |
588 | skip_norm_sync = false; |
589 | if (NULL != zmq_decoder) |
590 | delete zmq_decoder; |
591 | zmq_decoder = |
592 | new (std::nothrow) v2_decoder_t (in_batch_size, max_msg_size, zero_copy); |
593 | alloc_assert (zmq_decoder); |
594 | if (NULL != zmq_decoder) { |
595 | buffer_count = 0; |
596 | buffer_size = 0; |
597 | zmq_decoder->get_buffer (&buffer_ptr, &buffer_size); |
598 | return true; |
599 | } else { |
600 | return false; |
601 | } |
602 | } // end zmq::norm_engine_t::NormRxStreamState::Init() |
603 | |
604 | // This decodes any pending data sitting in our stream decoder buffer |
605 | // It returns 1 upon message completion, -1 on error, 1 on msg completion |
606 | int zmq::norm_engine_t::NormRxStreamState::Decode () |
607 | { |
608 | // If we have pending bytes to decode, process those first |
609 | while (buffer_count > 0) { |
610 | // There's pending data for the decoder to decode |
611 | size_t processed = 0; |
612 | |
613 | // This a bit of a kludgy approach used to weed |
614 | // out the NORM ZMQ message transport "syncFlag" byte |
615 | // from the ZMQ message stream being decoded (but it works!) |
616 | if (skip_norm_sync) { |
617 | buffer_ptr++; |
618 | buffer_count--; |
619 | skip_norm_sync = false; |
620 | } |
621 | |
622 | int rc = zmq_decoder->decode (buffer_ptr, buffer_count, processed); |
623 | buffer_ptr += processed; |
624 | buffer_count -= processed; |
625 | switch (rc) { |
626 | case 1: |
627 | // msg completed |
628 | if (0 == buffer_count) { |
629 | buffer_size = 0; |
630 | zmq_decoder->get_buffer (&buffer_ptr, &buffer_size); |
631 | } |
632 | skip_norm_sync = true; |
633 | return 1; |
634 | case -1: |
635 | // decoder error (reset decoder and state variables) |
636 | in_sync = false; |
637 | skip_norm_sync = false; // will get consumed by norm sync check |
638 | Init (); |
639 | break; |
640 | |
641 | case 0: |
642 | // need more data, keep decoding until buffer exhausted |
643 | break; |
644 | } |
645 | } |
646 | // Reset buffer pointer/count for next read |
647 | buffer_count = 0; |
648 | buffer_size = 0; |
649 | zmq_decoder->get_buffer (&buffer_ptr, &buffer_size); |
650 | return 0; // need more data |
651 | |
652 | } // end zmq::norm_engine_t::NormRxStreamState::Decode() |
653 | |
654 | zmq::norm_engine_t::NormRxStreamState::List::List () : head (NULL), tail (NULL) |
655 | { |
656 | } |
657 | |
658 | zmq::norm_engine_t::NormRxStreamState::List::~List () |
659 | { |
660 | Destroy (); |
661 | } |
662 | |
663 | void zmq::norm_engine_t::NormRxStreamState::List::Destroy () |
664 | { |
665 | NormRxStreamState *item = head; |
666 | while (NULL != item) { |
667 | Remove (*item); |
668 | delete item; |
669 | item = head; |
670 | } |
671 | } // end zmq::norm_engine_t::NormRxStreamState::List::Destroy() |
672 | |
673 | void zmq::norm_engine_t::NormRxStreamState::List::Append ( |
674 | NormRxStreamState &item) |
675 | { |
676 | item.prev = tail; |
677 | if (NULL != tail) |
678 | tail->next = &item; |
679 | else |
680 | head = &item; |
681 | item.next = NULL; |
682 | tail = &item; |
683 | item.list = this; |
684 | } // end zmq::norm_engine_t::NormRxStreamState::List::Append() |
685 | |
686 | void zmq::norm_engine_t::NormRxStreamState::List::Remove ( |
687 | NormRxStreamState &item) |
688 | { |
689 | if (NULL != item.prev) |
690 | item.prev->next = item.next; |
691 | else |
692 | head = item.next; |
693 | if (NULL != item.next) |
694 | item.next->prev = item.prev; |
695 | else |
696 | tail = item.prev; |
697 | item.prev = item.next = NULL; |
698 | item.list = NULL; |
699 | } // end zmq::norm_engine_t::NormRxStreamState::List::Remove() |
700 | |
701 | zmq::norm_engine_t::NormRxStreamState::List::Iterator::Iterator ( |
702 | const List &list) : |
703 | next_item (list.head) |
704 | { |
705 | } |
706 | |
707 | zmq::norm_engine_t::NormRxStreamState * |
708 | zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem () |
709 | { |
710 | NormRxStreamState *nextItem = next_item; |
711 | if (NULL != nextItem) |
712 | next_item = nextItem->next; |
713 | return nextItem; |
714 | } // end zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem() |
715 | |
716 | const zmq::endpoint_uri_pair_t &zmq::norm_engine_t::get_endpoint () const |
717 | { |
718 | return _empty_endpoint; |
719 | } |
720 | |
721 | #endif // ZMQ_HAVE_NORM |
722 | |