1/**********
2This library is free software; you can redistribute it and/or modify it under
3the terms of the GNU Lesser General Public License as published by the
4Free Software Foundation; either version 3 of the License, or (at your
5option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
6
7This library is distributed in the hope that it will be useful, but WITHOUT
8ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
9FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
10more details.
11
12You should have received a copy of the GNU Lesser General Public License
13along with this library; if not, write to the Free Software Foundation, Inc.,
1451 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15**********/
16// "liveMedia"
17// Copyright (c) 1996-2020 Live Networks, Inc. All rights reserved.
18// RTP source for a common kind of payload format: Those that pack multiple,
19// complete codec frames (as many as possible) into each RTP packet.
20// Implementation
21
22#include "MultiFramedRTPSource.hh"
23#include "RTCP.hh"
24#include "GroupsockHelper.hh"
25#include <string.h>
26
27////////// ReorderingPacketBuffer definition //////////
28
29class ReorderingPacketBuffer {
30public:
31 ReorderingPacketBuffer(BufferedPacketFactory* packetFactory);
32 virtual ~ReorderingPacketBuffer();
33 void reset();
34
35 BufferedPacket* getFreePacket(MultiFramedRTPSource* ourSource);
36 Boolean storePacket(BufferedPacket* bPacket);
37 BufferedPacket* getNextCompletedPacket(Boolean& packetLossPreceded);
38 void releaseUsedPacket(BufferedPacket* packet);
39 void freePacket(BufferedPacket* packet) {
40 if (packet != fSavedPacket) {
41 delete packet;
42 } else {
43 fSavedPacketFree = True;
44 }
45 }
46 Boolean isEmpty() const { return fHeadPacket == NULL; }
47
48 void setThresholdTime(unsigned uSeconds) { fThresholdTime = uSeconds; }
49 void resetHaveSeenFirstPacket() { fHaveSeenFirstPacket = False; }
50
51private:
52 BufferedPacketFactory* fPacketFactory;
53 unsigned fThresholdTime; // uSeconds
54 Boolean fHaveSeenFirstPacket; // used to set initial "fNextExpectedSeqNo"
55 unsigned short fNextExpectedSeqNo;
56 BufferedPacket* fHeadPacket;
57 BufferedPacket* fTailPacket;
58 BufferedPacket* fSavedPacket;
59 // to avoid calling new/free in the common case
60 Boolean fSavedPacketFree;
61};
62
63
64////////// MultiFramedRTPSource implementation //////////
65
66MultiFramedRTPSource
67::MultiFramedRTPSource(UsageEnvironment& env, Groupsock* RTPgs,
68 unsigned char rtpPayloadFormat,
69 unsigned rtpTimestampFrequency,
70 BufferedPacketFactory* packetFactory)
71 : RTPSource(env, RTPgs, rtpPayloadFormat, rtpTimestampFrequency) {
72 reset();
73 fReorderingBuffer = new ReorderingPacketBuffer(packetFactory);
74
75 // Try to use a big receive buffer for RTP:
76 increaseReceiveBufferTo(env, RTPgs->socketNum(), 50*1024);
77}
78
79void MultiFramedRTPSource::reset() {
80 fCurrentPacketBeginsFrame = True; // by default
81 fCurrentPacketCompletesFrame = True; // by default
82 fAreDoingNetworkReads = False;
83 fPacketReadInProgress = NULL;
84 fNeedDelivery = False;
85 fPacketLossInFragmentedFrame = False;
86}
87
88MultiFramedRTPSource::~MultiFramedRTPSource() {
89 delete fReorderingBuffer;
90}
91
92Boolean MultiFramedRTPSource
93::processSpecialHeader(BufferedPacket* /*packet*/,
94 unsigned& resultSpecialHeaderSize) {
95 // Default implementation: Assume no special header:
96 resultSpecialHeaderSize = 0;
97 return True;
98}
99
100Boolean MultiFramedRTPSource
101::packetIsUsableInJitterCalculation(unsigned char* /*packet*/,
102 unsigned /*packetSize*/) {
103 // Default implementation:
104 return True;
105}
106
107void MultiFramedRTPSource::doStopGettingFrames() {
108 if (fPacketReadInProgress != NULL) {
109 fReorderingBuffer->freePacket(fPacketReadInProgress);
110 fPacketReadInProgress = NULL;
111 }
112 envir().taskScheduler().unscheduleDelayedTask(nextTask());
113 fRTPInterface.stopNetworkReading();
114 fReorderingBuffer->reset();
115 reset();
116}
117
118void MultiFramedRTPSource::doGetNextFrame() {
119 if (!fAreDoingNetworkReads) {
120 // Turn on background read handling of incoming packets:
121 fAreDoingNetworkReads = True;
122 TaskScheduler::BackgroundHandlerProc* handler
123 = (TaskScheduler::BackgroundHandlerProc*)&networkReadHandler;
124 fRTPInterface.startNetworkReading(handler);
125 }
126
127 fSavedTo = fTo;
128 fSavedMaxSize = fMaxSize;
129 fFrameSize = 0; // for now
130 fNeedDelivery = True;
131 doGetNextFrame1();
132}
133
134void MultiFramedRTPSource::doGetNextFrame1() {
135 while (fNeedDelivery) {
136 // If we already have packet data available, then deliver it now.
137 Boolean packetLossPrecededThis;
138 BufferedPacket* nextPacket
139 = fReorderingBuffer->getNextCompletedPacket(packetLossPrecededThis);
140 if (nextPacket == NULL) break;
141
142 fNeedDelivery = False;
143
144 if (nextPacket->useCount() == 0) {
145 // Before using the packet, check whether it has a special header
146 // that needs to be processed:
147 unsigned specialHeaderSize;
148 if (!processSpecialHeader(nextPacket, specialHeaderSize)) {
149 // Something's wrong with the header; reject the packet:
150 fReorderingBuffer->releaseUsedPacket(nextPacket);
151 fNeedDelivery = True;
152 continue;
153 }
154 nextPacket->skip(specialHeaderSize);
155 }
156
157 // Check whether we're part of a multi-packet frame, and whether
158 // there was packet loss that would render this packet unusable:
159 if (fCurrentPacketBeginsFrame) {
160 if (packetLossPrecededThis || fPacketLossInFragmentedFrame) {
161 // We didn't get all of the previous frame.
162 // Forget any data that we used from it:
163 fTo = fSavedTo; fMaxSize = fSavedMaxSize;
164 fFrameSize = 0;
165 }
166 fPacketLossInFragmentedFrame = False;
167 } else if (packetLossPrecededThis) {
168 // We're in a multi-packet frame, with preceding packet loss
169 fPacketLossInFragmentedFrame = True;
170 }
171 if (fPacketLossInFragmentedFrame) {
172 // This packet is unusable; reject it:
173 fReorderingBuffer->releaseUsedPacket(nextPacket);
174 fNeedDelivery = True;
175 continue;
176 }
177
178 // The packet is usable. Deliver all or part of it to our caller:
179 unsigned frameSize;
180 nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes,
181 fCurPacketRTPSeqNum, fCurPacketRTPTimestamp,
182 fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP,
183 fCurPacketMarkerBit);
184 fFrameSize += frameSize;
185
186 if (!nextPacket->hasUsableData()) {
187 // We're completely done with this packet now
188 fReorderingBuffer->releaseUsedPacket(nextPacket);
189 }
190
191 if (fCurrentPacketCompletesFrame && fFrameSize > 0) {
192 // We have all the data that the client wants.
193 if (fNumTruncatedBytes > 0) {
194 envir() << "MultiFramedRTPSource::doGetNextFrame1(): The total received frame size exceeds the client's buffer size ("
195 << fSavedMaxSize << "). "
196 << fNumTruncatedBytes << " bytes of trailing data will be dropped!\n";
197 }
198 // Call our own 'after getting' function, so that the downstream object can consume the data:
199 if (fReorderingBuffer->isEmpty()) {
200 // Common case optimization: There are no more queued incoming packets, so this code will not get
201 // executed again without having first returned to the event loop. Call our 'after getting' function
202 // directly, because there's no risk of a long chain of recursion (and thus stack overflow):
203 afterGetting(this);
204 } else {
205 // Special case: Call our 'after getting' function via the event loop.
206 nextTask() = envir().taskScheduler().scheduleDelayedTask(0,
207 (TaskFunc*)FramedSource::afterGetting, this);
208 }
209 } else {
210 // This packet contained fragmented data, and does not complete
211 // the data that the client wants. Keep getting data:
212 fTo += frameSize; fMaxSize -= frameSize;
213 fNeedDelivery = True;
214 }
215 }
216}
217
218void MultiFramedRTPSource
219::setPacketReorderingThresholdTime(unsigned uSeconds) {
220 fReorderingBuffer->setThresholdTime(uSeconds);
221}
222
223#define ADVANCE(n) do { bPacket->skip(n); } while (0)
224
225void MultiFramedRTPSource::networkReadHandler(MultiFramedRTPSource* source, int /*mask*/) {
226 source->networkReadHandler1();
227}
228
229void MultiFramedRTPSource::networkReadHandler1() {
230 BufferedPacket* bPacket = fPacketReadInProgress;
231 if (bPacket == NULL) {
232 // Normal case: Get a free BufferedPacket descriptor to hold the new network packet:
233 bPacket = fReorderingBuffer->getFreePacket(this);
234 }
235
236 // Read the network packet, and perform sanity checks on the RTP header:
237 Boolean readSuccess = False;
238 do {
239 struct sockaddr_in fromAddress;
240 Boolean packetReadWasIncomplete = fPacketReadInProgress != NULL;
241 if (!bPacket->fillInData(fRTPInterface, fromAddress, packetReadWasIncomplete)) {
242 if (bPacket->bytesAvailable() == 0) { // should not happen??
243 envir() << "MultiFramedRTPSource internal error: Hit limit when reading incoming packet over TCP\n";
244 }
245 fPacketReadInProgress = NULL;
246 break;
247 }
248 if (packetReadWasIncomplete) {
249 // We need additional read(s) before we can process the incoming packet:
250 fPacketReadInProgress = bPacket;
251 return;
252 } else {
253 fPacketReadInProgress = NULL;
254 }
255#ifdef TEST_LOSS
256 setPacketReorderingThresholdTime(0);
257 // don't wait for 'lost' packets to arrive out-of-order later
258 if ((our_random()%10) == 0) break; // simulate 10% packet loss
259#endif
260
261 // Check for the 12-byte RTP header:
262 if (bPacket->dataSize() < 12) break;
263 unsigned rtpHdr = ntohl(*(u_int32_t*)(bPacket->data())); ADVANCE(4);
264 Boolean rtpMarkerBit = (rtpHdr&0x00800000) != 0;
265 unsigned rtpTimestamp = ntohl(*(u_int32_t*)(bPacket->data()));ADVANCE(4);
266 unsigned rtpSSRC = ntohl(*(u_int32_t*)(bPacket->data())); ADVANCE(4);
267
268 // Check the RTP version number (it should be 2):
269 if ((rtpHdr&0xC0000000) != 0x80000000) break;
270
271 // Check the Payload Type.
272 unsigned char rtpPayloadType = (unsigned char)((rtpHdr&0x007F0000)>>16);
273 if (rtpPayloadType != rtpPayloadFormat()) {
274 if (fRTCPInstanceForMultiplexedRTCPPackets != NULL
275 && rtpPayloadType >= 64 && rtpPayloadType <= 95) {
276 // This is a multiplexed RTCP packet, and we've been asked to deliver such packets.
277 // Do so now:
278 fRTCPInstanceForMultiplexedRTCPPackets
279 ->injectReport(bPacket->data()-12, bPacket->dataSize()+12, fromAddress);
280 }
281 break;
282 }
283
284 // Skip over any CSRC identifiers in the header:
285 unsigned cc = (rtpHdr>>24)&0x0F;
286 if (bPacket->dataSize() < cc*4) break;
287 ADVANCE(cc*4);
288
289 // Check for (& ignore) any RTP header extension
290 if (rtpHdr&0x10000000) {
291 if (bPacket->dataSize() < 4) break;
292 unsigned extHdr = ntohl(*(u_int32_t*)(bPacket->data())); ADVANCE(4);
293 unsigned remExtSize = 4*(extHdr&0xFFFF);
294 if (bPacket->dataSize() < remExtSize) break;
295 ADVANCE(remExtSize);
296 }
297
298 // Discard any padding bytes:
299 if (rtpHdr&0x20000000) {
300 if (bPacket->dataSize() == 0) break;
301 unsigned numPaddingBytes
302 = (unsigned)(bPacket->data())[bPacket->dataSize()-1];
303 if (bPacket->dataSize() < numPaddingBytes) break;
304 bPacket->removePadding(numPaddingBytes);
305 }
306
307 // The rest of the packet is the usable data. Record and save it:
308 if (rtpSSRC != fLastReceivedSSRC) {
309 // The SSRC of incoming packets has changed. Unfortunately we don't yet handle streams that contain multiple SSRCs,
310 // but we can handle a single-SSRC stream where the SSRC changes occasionally:
311 fLastReceivedSSRC = rtpSSRC;
312 fReorderingBuffer->resetHaveSeenFirstPacket();
313 }
314 unsigned short rtpSeqNo = (unsigned short)(rtpHdr&0xFFFF);
315 Boolean usableInJitterCalculation
316 = packetIsUsableInJitterCalculation((bPacket->data()),
317 bPacket->dataSize());
318 struct timeval presentationTime; // computed by:
319 Boolean hasBeenSyncedUsingRTCP; // computed by:
320 receptionStatsDB()
321 .noteIncomingPacket(rtpSSRC, rtpSeqNo, rtpTimestamp,
322 timestampFrequency(),
323 usableInJitterCalculation, presentationTime,
324 hasBeenSyncedUsingRTCP, bPacket->dataSize());
325
326 // Fill in the rest of the packet descriptor, and store it:
327 struct timeval timeNow;
328 gettimeofday(&timeNow, NULL);
329 bPacket->assignMiscParams(rtpSeqNo, rtpTimestamp, presentationTime,
330 hasBeenSyncedUsingRTCP, rtpMarkerBit,
331 timeNow);
332 if (!fReorderingBuffer->storePacket(bPacket)) break;
333
334 readSuccess = True;
335 } while (0);
336 if (!readSuccess) fReorderingBuffer->freePacket(bPacket);
337
338 doGetNextFrame1();
339 // If we didn't get proper data this time, we'll get another chance
340}
341
342
343////////// BufferedPacket and BufferedPacketFactory implementation /////
344
345#define MAX_PACKET_SIZE 65536
346
347BufferedPacket::BufferedPacket()
348 : fPacketSize(MAX_PACKET_SIZE),
349 fBuf(new unsigned char[MAX_PACKET_SIZE]),
350 fNextPacket(NULL) {
351}
352
353BufferedPacket::~BufferedPacket() {
354 delete fNextPacket;
355 delete[] fBuf;
356}
357
358void BufferedPacket::reset() {
359 fHead = fTail = 0;
360 fUseCount = 0;
361 fIsFirstPacket = False; // by default
362}
363
364// The following function has been deprecated:
365unsigned BufferedPacket
366::nextEnclosedFrameSize(unsigned char*& /*framePtr*/, unsigned dataSize) {
367 // By default, use the entire buffered data, even though it may consist
368 // of more than one frame, on the assumption that the client doesn't
369 // care. (This is more efficient than delivering a frame at a time)
370 return dataSize;
371}
372
373void BufferedPacket
374::getNextEnclosedFrameParameters(unsigned char*& framePtr, unsigned dataSize,
375 unsigned& frameSize,
376 unsigned& frameDurationInMicroseconds) {
377 // By default, use the entire buffered data, even though it may consist
378 // of more than one frame, on the assumption that the client doesn't
379 // care. (This is more efficient than delivering a frame at a time)
380
381 // For backwards-compatibility with existing uses of (the now deprecated)
382 // "nextEnclosedFrameSize()", call that function to implement this one:
383 frameSize = nextEnclosedFrameSize(framePtr, dataSize);
384
385 frameDurationInMicroseconds = 0; // by default. Subclasses should correct this.
386}
387
388Boolean BufferedPacket::fillInData(RTPInterface& rtpInterface, struct sockaddr_in& fromAddress,
389 Boolean& packetReadWasIncomplete) {
390 if (!packetReadWasIncomplete) reset();
391
392 unsigned const maxBytesToRead = bytesAvailable();
393 if (maxBytesToRead == 0) return False; // exceeded buffer size when reading over TCP
394
395 unsigned numBytesRead;
396 int tcpSocketNum; // not used
397 unsigned char tcpStreamChannelId; // not used
398 if (!rtpInterface.handleRead(&fBuf[fTail], maxBytesToRead,
399 numBytesRead, fromAddress,
400 tcpSocketNum, tcpStreamChannelId,
401 packetReadWasIncomplete)) {
402 return False;
403 }
404 fTail += numBytesRead;
405 return True;
406}
407
408void BufferedPacket
409::assignMiscParams(unsigned short rtpSeqNo, unsigned rtpTimestamp,
410 struct timeval presentationTime,
411 Boolean hasBeenSyncedUsingRTCP, Boolean rtpMarkerBit,
412 struct timeval timeReceived) {
413 fRTPSeqNo = rtpSeqNo;
414 fRTPTimestamp = rtpTimestamp;
415 fPresentationTime = presentationTime;
416 fHasBeenSyncedUsingRTCP = hasBeenSyncedUsingRTCP;
417 fRTPMarkerBit = rtpMarkerBit;
418 fTimeReceived = timeReceived;
419}
420
421void BufferedPacket::skip(unsigned numBytes) {
422 fHead += numBytes;
423 if (fHead > fTail) fHead = fTail;
424}
425
426void BufferedPacket::removePadding(unsigned numBytes) {
427 if (numBytes > fTail-fHead) numBytes = fTail-fHead;
428 fTail -= numBytes;
429}
430
431void BufferedPacket::appendData(unsigned char* newData, unsigned numBytes) {
432 if (numBytes > fPacketSize-fTail) numBytes = fPacketSize - fTail;
433 memmove(&fBuf[fTail], newData, numBytes);
434 fTail += numBytes;
435}
436
437void BufferedPacket::use(unsigned char* to, unsigned toSize,
438 unsigned& bytesUsed, unsigned& bytesTruncated,
439 unsigned short& rtpSeqNo, unsigned& rtpTimestamp,
440 struct timeval& presentationTime,
441 Boolean& hasBeenSyncedUsingRTCP,
442 Boolean& rtpMarkerBit) {
443 unsigned char* origFramePtr = &fBuf[fHead];
444 unsigned char* newFramePtr = origFramePtr; // may change in the call below
445 unsigned frameSize, frameDurationInMicroseconds;
446 getNextEnclosedFrameParameters(newFramePtr, fTail - fHead,
447 frameSize, frameDurationInMicroseconds);
448 if (frameSize > toSize) {
449 bytesTruncated += frameSize - toSize;
450 bytesUsed = toSize;
451 } else {
452 bytesTruncated = 0;
453 bytesUsed = frameSize;
454 }
455
456 memmove(to, newFramePtr, bytesUsed);
457 fHead += (newFramePtr - origFramePtr) + frameSize;
458 ++fUseCount;
459
460 rtpSeqNo = fRTPSeqNo;
461 rtpTimestamp = fRTPTimestamp;
462 presentationTime = fPresentationTime;
463 hasBeenSyncedUsingRTCP = fHasBeenSyncedUsingRTCP;
464 rtpMarkerBit = fRTPMarkerBit;
465
466 // Update "fPresentationTime" for the next enclosed frame (if any):
467 fPresentationTime.tv_usec += frameDurationInMicroseconds;
468 if (fPresentationTime.tv_usec >= 1000000) {
469 fPresentationTime.tv_sec += fPresentationTime.tv_usec/1000000;
470 fPresentationTime.tv_usec = fPresentationTime.tv_usec%1000000;
471 }
472}
473
474BufferedPacketFactory::BufferedPacketFactory() {
475}
476
477BufferedPacketFactory::~BufferedPacketFactory() {
478}
479
480BufferedPacket* BufferedPacketFactory
481::createNewPacket(MultiFramedRTPSource* /*ourSource*/) {
482 return new BufferedPacket;
483}
484
485
486////////// ReorderingPacketBuffer implementation //////////
487
488ReorderingPacketBuffer
489::ReorderingPacketBuffer(BufferedPacketFactory* packetFactory)
490 : fThresholdTime(100000) /* default reordering threshold: 100 ms */,
491 fHaveSeenFirstPacket(False), fHeadPacket(NULL), fTailPacket(NULL), fSavedPacket(NULL), fSavedPacketFree(True) {
492 fPacketFactory = (packetFactory == NULL)
493 ? (new BufferedPacketFactory)
494 : packetFactory;
495}
496
497ReorderingPacketBuffer::~ReorderingPacketBuffer() {
498 reset();
499 delete fPacketFactory;
500}
501
502void ReorderingPacketBuffer::reset() {
503 if (fSavedPacketFree) delete fSavedPacket; // because fSavedPacket is not in the list
504 delete fHeadPacket; // will also delete fSavedPacket if it's in the list
505 resetHaveSeenFirstPacket();
506 fHeadPacket = fTailPacket = fSavedPacket = NULL;
507}
508
509BufferedPacket* ReorderingPacketBuffer::getFreePacket(MultiFramedRTPSource* ourSource) {
510 if (fSavedPacket == NULL) { // we're being called for the first time
511 fSavedPacket = fPacketFactory->createNewPacket(ourSource);
512 fSavedPacketFree = True;
513 }
514
515 if (fSavedPacketFree == True) {
516 fSavedPacketFree = False;
517 return fSavedPacket;
518 } else {
519 return fPacketFactory->createNewPacket(ourSource);
520 }
521}
522
523Boolean ReorderingPacketBuffer::storePacket(BufferedPacket* bPacket) {
524 unsigned short rtpSeqNo = bPacket->rtpSeqNo();
525
526 if (!fHaveSeenFirstPacket) {
527 fNextExpectedSeqNo = rtpSeqNo; // initialization
528 bPacket->isFirstPacket() = True;
529 fHaveSeenFirstPacket = True;
530 }
531
532 // Ignore this packet if its sequence number is less than the one
533 // that we're looking for (in this case, it's been excessively delayed).
534 if (seqNumLT(rtpSeqNo, fNextExpectedSeqNo)) return False;
535
536 if (fTailPacket == NULL) {
537 // Common case: There are no packets in the queue; this will be the first one:
538 bPacket->nextPacket() = NULL;
539 fHeadPacket = fTailPacket = bPacket;
540 return True;
541 }
542
543 if (seqNumLT(fTailPacket->rtpSeqNo(), rtpSeqNo)) {
544 // The next-most common case: There are packets already in the queue; this packet arrived in order => put it at the tail:
545 bPacket->nextPacket() = NULL;
546 fTailPacket->nextPacket() = bPacket;
547 fTailPacket = bPacket;
548 return True;
549 }
550
551 if (rtpSeqNo == fTailPacket->rtpSeqNo()) {
552 // This is a duplicate packet - ignore it
553 return False;
554 }
555
556 // Rare case: This packet is out-of-order. Run through the list (from the head), to figure out where it belongs:
557 BufferedPacket* beforePtr = NULL;
558 BufferedPacket* afterPtr = fHeadPacket;
559 while (afterPtr != NULL) {
560 if (seqNumLT(rtpSeqNo, afterPtr->rtpSeqNo())) break; // it comes here
561 if (rtpSeqNo == afterPtr->rtpSeqNo()) {
562 // This is a duplicate packet - ignore it
563 return False;
564 }
565
566 beforePtr = afterPtr;
567 afterPtr = afterPtr->nextPacket();
568 }
569
570 // Link our new packet between "beforePtr" and "afterPtr":
571 bPacket->nextPacket() = afterPtr;
572 if (beforePtr == NULL) {
573 fHeadPacket = bPacket;
574 } else {
575 beforePtr->nextPacket() = bPacket;
576 }
577
578 return True;
579}
580
581void ReorderingPacketBuffer::releaseUsedPacket(BufferedPacket* packet) {
582 // ASSERT: packet == fHeadPacket
583 // ASSERT: fNextExpectedSeqNo == packet->rtpSeqNo()
584 ++fNextExpectedSeqNo; // because we're finished with this packet now
585
586 fHeadPacket = fHeadPacket->nextPacket();
587 if (!fHeadPacket) {
588 fTailPacket = NULL;
589 }
590 packet->nextPacket() = NULL;
591
592 freePacket(packet);
593}
594
595BufferedPacket* ReorderingPacketBuffer
596::getNextCompletedPacket(Boolean& packetLossPreceded) {
597 if (fHeadPacket == NULL) return NULL;
598
599 // Check whether the next packet we want is already at the head
600 // of the queue:
601 // ASSERT: fHeadPacket->rtpSeqNo() >= fNextExpectedSeqNo
602 if (fHeadPacket->rtpSeqNo() == fNextExpectedSeqNo) {
603 packetLossPreceded = fHeadPacket->isFirstPacket();
604 // (The very first packet is treated as if there was packet loss beforehand.)
605 return fHeadPacket;
606 }
607
608 // We're still waiting for our desired packet to arrive. However, if
609 // our time threshold has been exceeded, then forget it, and return
610 // the head packet instead:
611 Boolean timeThresholdHasBeenExceeded;
612 if (fThresholdTime == 0) {
613 timeThresholdHasBeenExceeded = True; // optimization
614 } else {
615 struct timeval timeNow;
616 gettimeofday(&timeNow, NULL);
617 unsigned uSecondsSinceReceived
618 = (timeNow.tv_sec - fHeadPacket->timeReceived().tv_sec)*1000000
619 + (timeNow.tv_usec - fHeadPacket->timeReceived().tv_usec);
620 timeThresholdHasBeenExceeded = uSecondsSinceReceived > fThresholdTime;
621 }
622 if (timeThresholdHasBeenExceeded) {
623 fNextExpectedSeqNo = fHeadPacket->rtpSeqNo();
624 // we've given up on earlier packets now
625 packetLossPreceded = True;
626 return fHeadPacket;
627 }
628
629 // Otherwise, keep waiting for our desired packet to arrive:
630 return NULL;
631}
632