1 | /********** |
2 | This library is free software; you can redistribute it and/or modify it under |
3 | the terms of the GNU Lesser General Public License as published by the |
4 | Free Software Foundation; either version 3 of the License, or (at your |
5 | option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.) |
6 | |
7 | This library is distributed in the hope that it will be useful, but WITHOUT |
8 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
9 | FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for |
10 | more details. |
11 | |
12 | You should have received a copy of the GNU Lesser General Public License |
13 | along with this library; if not, write to the Free Software Foundation, Inc., |
14 | 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
15 | **********/ |
16 | // "liveMedia" |
17 | // Copyright (c) 1996-2020 Live Networks, Inc. All rights reserved. |
18 | // RTP source for a common kind of payload format: Those that pack multiple, |
19 | // complete codec frames (as many as possible) into each RTP packet. |
20 | // Implementation |
21 | |
22 | #include "MultiFramedRTPSource.hh" |
23 | #include "RTCP.hh" |
24 | #include "GroupsockHelper.hh" |
25 | #include <string.h> |
26 | |
27 | ////////// ReorderingPacketBuffer definition ////////// |
28 | |
29 | class ReorderingPacketBuffer { |
30 | public: |
31 | ReorderingPacketBuffer(BufferedPacketFactory* packetFactory); |
32 | virtual ~ReorderingPacketBuffer(); |
33 | void reset(); |
34 | |
35 | BufferedPacket* getFreePacket(MultiFramedRTPSource* ourSource); |
36 | Boolean storePacket(BufferedPacket* bPacket); |
37 | BufferedPacket* getNextCompletedPacket(Boolean& packetLossPreceded); |
38 | void releaseUsedPacket(BufferedPacket* packet); |
39 | void freePacket(BufferedPacket* packet) { |
40 | if (packet != fSavedPacket) { |
41 | delete packet; |
42 | } else { |
43 | fSavedPacketFree = True; |
44 | } |
45 | } |
46 | Boolean isEmpty() const { return fHeadPacket == NULL; } |
47 | |
48 | void setThresholdTime(unsigned uSeconds) { fThresholdTime = uSeconds; } |
49 | void resetHaveSeenFirstPacket() { fHaveSeenFirstPacket = False; } |
50 | |
51 | private: |
52 | BufferedPacketFactory* fPacketFactory; |
53 | unsigned fThresholdTime; // uSeconds |
54 | Boolean fHaveSeenFirstPacket; // used to set initial "fNextExpectedSeqNo" |
55 | unsigned short fNextExpectedSeqNo; |
56 | BufferedPacket* fHeadPacket; |
57 | BufferedPacket* fTailPacket; |
58 | BufferedPacket* fSavedPacket; |
59 | // to avoid calling new/free in the common case |
60 | Boolean fSavedPacketFree; |
61 | }; |
62 | |
63 | |
64 | ////////// MultiFramedRTPSource implementation ////////// |
65 | |
66 | MultiFramedRTPSource |
67 | ::MultiFramedRTPSource(UsageEnvironment& env, Groupsock* RTPgs, |
68 | unsigned char rtpPayloadFormat, |
69 | unsigned rtpTimestampFrequency, |
70 | BufferedPacketFactory* packetFactory) |
71 | : RTPSource(env, RTPgs, rtpPayloadFormat, rtpTimestampFrequency) { |
72 | reset(); |
73 | fReorderingBuffer = new ReorderingPacketBuffer(packetFactory); |
74 | |
75 | // Try to use a big receive buffer for RTP: |
76 | increaseReceiveBufferTo(env, RTPgs->socketNum(), 50*1024); |
77 | } |
78 | |
79 | void MultiFramedRTPSource::reset() { |
80 | fCurrentPacketBeginsFrame = True; // by default |
81 | fCurrentPacketCompletesFrame = True; // by default |
82 | fAreDoingNetworkReads = False; |
83 | fPacketReadInProgress = NULL; |
84 | fNeedDelivery = False; |
85 | fPacketLossInFragmentedFrame = False; |
86 | } |
87 | |
88 | MultiFramedRTPSource::~MultiFramedRTPSource() { |
89 | delete fReorderingBuffer; |
90 | } |
91 | |
92 | Boolean MultiFramedRTPSource |
93 | ::(BufferedPacket* /*packet*/, |
94 | unsigned& ) { |
95 | // Default implementation: Assume no special header: |
96 | resultSpecialHeaderSize = 0; |
97 | return True; |
98 | } |
99 | |
100 | Boolean MultiFramedRTPSource |
101 | ::packetIsUsableInJitterCalculation(unsigned char* /*packet*/, |
102 | unsigned /*packetSize*/) { |
103 | // Default implementation: |
104 | return True; |
105 | } |
106 | |
107 | void MultiFramedRTPSource::doStopGettingFrames() { |
108 | if (fPacketReadInProgress != NULL) { |
109 | fReorderingBuffer->freePacket(fPacketReadInProgress); |
110 | fPacketReadInProgress = NULL; |
111 | } |
112 | envir().taskScheduler().unscheduleDelayedTask(nextTask()); |
113 | fRTPInterface.stopNetworkReading(); |
114 | fReorderingBuffer->reset(); |
115 | reset(); |
116 | } |
117 | |
118 | void MultiFramedRTPSource::doGetNextFrame() { |
119 | if (!fAreDoingNetworkReads) { |
120 | // Turn on background read handling of incoming packets: |
121 | fAreDoingNetworkReads = True; |
122 | TaskScheduler::BackgroundHandlerProc* handler |
123 | = (TaskScheduler::BackgroundHandlerProc*)&networkReadHandler; |
124 | fRTPInterface.startNetworkReading(handler); |
125 | } |
126 | |
127 | fSavedTo = fTo; |
128 | fSavedMaxSize = fMaxSize; |
129 | fFrameSize = 0; // for now |
130 | fNeedDelivery = True; |
131 | doGetNextFrame1(); |
132 | } |
133 | |
134 | void MultiFramedRTPSource::doGetNextFrame1() { |
135 | while (fNeedDelivery) { |
136 | // If we already have packet data available, then deliver it now. |
137 | Boolean packetLossPrecededThis; |
138 | BufferedPacket* nextPacket |
139 | = fReorderingBuffer->getNextCompletedPacket(packetLossPrecededThis); |
140 | if (nextPacket == NULL) break; |
141 | |
142 | fNeedDelivery = False; |
143 | |
144 | if (nextPacket->useCount() == 0) { |
145 | // Before using the packet, check whether it has a special header |
146 | // that needs to be processed: |
147 | unsigned ; |
148 | if (!processSpecialHeader(nextPacket, specialHeaderSize)) { |
149 | // Something's wrong with the header; reject the packet: |
150 | fReorderingBuffer->releaseUsedPacket(nextPacket); |
151 | fNeedDelivery = True; |
152 | continue; |
153 | } |
154 | nextPacket->skip(specialHeaderSize); |
155 | } |
156 | |
157 | // Check whether we're part of a multi-packet frame, and whether |
158 | // there was packet loss that would render this packet unusable: |
159 | if (fCurrentPacketBeginsFrame) { |
160 | if (packetLossPrecededThis || fPacketLossInFragmentedFrame) { |
161 | // We didn't get all of the previous frame. |
162 | // Forget any data that we used from it: |
163 | fTo = fSavedTo; fMaxSize = fSavedMaxSize; |
164 | fFrameSize = 0; |
165 | } |
166 | fPacketLossInFragmentedFrame = False; |
167 | } else if (packetLossPrecededThis) { |
168 | // We're in a multi-packet frame, with preceding packet loss |
169 | fPacketLossInFragmentedFrame = True; |
170 | } |
171 | if (fPacketLossInFragmentedFrame) { |
172 | // This packet is unusable; reject it: |
173 | fReorderingBuffer->releaseUsedPacket(nextPacket); |
174 | fNeedDelivery = True; |
175 | continue; |
176 | } |
177 | |
178 | // The packet is usable. Deliver all or part of it to our caller: |
179 | unsigned frameSize; |
180 | nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes, |
181 | fCurPacketRTPSeqNum, fCurPacketRTPTimestamp, |
182 | fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP, |
183 | fCurPacketMarkerBit); |
184 | fFrameSize += frameSize; |
185 | |
186 | if (!nextPacket->hasUsableData()) { |
187 | // We're completely done with this packet now |
188 | fReorderingBuffer->releaseUsedPacket(nextPacket); |
189 | } |
190 | |
191 | if (fCurrentPacketCompletesFrame && fFrameSize > 0) { |
192 | // We have all the data that the client wants. |
193 | if (fNumTruncatedBytes > 0) { |
194 | envir() << "MultiFramedRTPSource::doGetNextFrame1(): The total received frame size exceeds the client's buffer size (" |
195 | << fSavedMaxSize << "). " |
196 | << fNumTruncatedBytes << " bytes of trailing data will be dropped!\n" ; |
197 | } |
198 | // Call our own 'after getting' function, so that the downstream object can consume the data: |
199 | if (fReorderingBuffer->isEmpty()) { |
200 | // Common case optimization: There are no more queued incoming packets, so this code will not get |
201 | // executed again without having first returned to the event loop. Call our 'after getting' function |
202 | // directly, because there's no risk of a long chain of recursion (and thus stack overflow): |
203 | afterGetting(this); |
204 | } else { |
205 | // Special case: Call our 'after getting' function via the event loop. |
206 | nextTask() = envir().taskScheduler().scheduleDelayedTask(0, |
207 | (TaskFunc*)FramedSource::afterGetting, this); |
208 | } |
209 | } else { |
210 | // This packet contained fragmented data, and does not complete |
211 | // the data that the client wants. Keep getting data: |
212 | fTo += frameSize; fMaxSize -= frameSize; |
213 | fNeedDelivery = True; |
214 | } |
215 | } |
216 | } |
217 | |
218 | void MultiFramedRTPSource |
219 | ::setPacketReorderingThresholdTime(unsigned uSeconds) { |
220 | fReorderingBuffer->setThresholdTime(uSeconds); |
221 | } |
222 | |
223 | #define ADVANCE(n) do { bPacket->skip(n); } while (0) |
224 | |
225 | void MultiFramedRTPSource::networkReadHandler(MultiFramedRTPSource* source, int /*mask*/) { |
226 | source->networkReadHandler1(); |
227 | } |
228 | |
229 | void MultiFramedRTPSource::networkReadHandler1() { |
230 | BufferedPacket* bPacket = fPacketReadInProgress; |
231 | if (bPacket == NULL) { |
232 | // Normal case: Get a free BufferedPacket descriptor to hold the new network packet: |
233 | bPacket = fReorderingBuffer->getFreePacket(this); |
234 | } |
235 | |
236 | // Read the network packet, and perform sanity checks on the RTP header: |
237 | Boolean readSuccess = False; |
238 | do { |
239 | struct sockaddr_in fromAddress; |
240 | Boolean packetReadWasIncomplete = fPacketReadInProgress != NULL; |
241 | if (!bPacket->fillInData(fRTPInterface, fromAddress, packetReadWasIncomplete)) { |
242 | if (bPacket->bytesAvailable() == 0) { // should not happen?? |
243 | envir() << "MultiFramedRTPSource internal error: Hit limit when reading incoming packet over TCP\n" ; |
244 | } |
245 | fPacketReadInProgress = NULL; |
246 | break; |
247 | } |
248 | if (packetReadWasIncomplete) { |
249 | // We need additional read(s) before we can process the incoming packet: |
250 | fPacketReadInProgress = bPacket; |
251 | return; |
252 | } else { |
253 | fPacketReadInProgress = NULL; |
254 | } |
255 | #ifdef TEST_LOSS |
256 | setPacketReorderingThresholdTime(0); |
257 | // don't wait for 'lost' packets to arrive out-of-order later |
258 | if ((our_random()%10) == 0) break; // simulate 10% packet loss |
259 | #endif |
260 | |
261 | // Check for the 12-byte RTP header: |
262 | if (bPacket->dataSize() < 12) break; |
263 | unsigned rtpHdr = ntohl(*(u_int32_t*)(bPacket->data())); ADVANCE(4); |
264 | Boolean rtpMarkerBit = (rtpHdr&0x00800000) != 0; |
265 | unsigned rtpTimestamp = ntohl(*(u_int32_t*)(bPacket->data()));ADVANCE(4); |
266 | unsigned rtpSSRC = ntohl(*(u_int32_t*)(bPacket->data())); ADVANCE(4); |
267 | |
268 | // Check the RTP version number (it should be 2): |
269 | if ((rtpHdr&0xC0000000) != 0x80000000) break; |
270 | |
271 | // Check the Payload Type. |
272 | unsigned char rtpPayloadType = (unsigned char)((rtpHdr&0x007F0000)>>16); |
273 | if (rtpPayloadType != rtpPayloadFormat()) { |
274 | if (fRTCPInstanceForMultiplexedRTCPPackets != NULL |
275 | && rtpPayloadType >= 64 && rtpPayloadType <= 95) { |
276 | // This is a multiplexed RTCP packet, and we've been asked to deliver such packets. |
277 | // Do so now: |
278 | fRTCPInstanceForMultiplexedRTCPPackets |
279 | ->injectReport(bPacket->data()-12, bPacket->dataSize()+12, fromAddress); |
280 | } |
281 | break; |
282 | } |
283 | |
284 | // Skip over any CSRC identifiers in the header: |
285 | unsigned cc = (rtpHdr>>24)&0x0F; |
286 | if (bPacket->dataSize() < cc*4) break; |
287 | ADVANCE(cc*4); |
288 | |
289 | // Check for (& ignore) any RTP header extension |
290 | if (rtpHdr&0x10000000) { |
291 | if (bPacket->dataSize() < 4) break; |
292 | unsigned extHdr = ntohl(*(u_int32_t*)(bPacket->data())); ADVANCE(4); |
293 | unsigned remExtSize = 4*(extHdr&0xFFFF); |
294 | if (bPacket->dataSize() < remExtSize) break; |
295 | ADVANCE(remExtSize); |
296 | } |
297 | |
298 | // Discard any padding bytes: |
299 | if (rtpHdr&0x20000000) { |
300 | if (bPacket->dataSize() == 0) break; |
301 | unsigned numPaddingBytes |
302 | = (unsigned)(bPacket->data())[bPacket->dataSize()-1]; |
303 | if (bPacket->dataSize() < numPaddingBytes) break; |
304 | bPacket->removePadding(numPaddingBytes); |
305 | } |
306 | |
307 | // The rest of the packet is the usable data. Record and save it: |
308 | if (rtpSSRC != fLastReceivedSSRC) { |
309 | // The SSRC of incoming packets has changed. Unfortunately we don't yet handle streams that contain multiple SSRCs, |
310 | // but we can handle a single-SSRC stream where the SSRC changes occasionally: |
311 | fLastReceivedSSRC = rtpSSRC; |
312 | fReorderingBuffer->resetHaveSeenFirstPacket(); |
313 | } |
314 | unsigned short rtpSeqNo = (unsigned short)(rtpHdr&0xFFFF); |
315 | Boolean usableInJitterCalculation |
316 | = packetIsUsableInJitterCalculation((bPacket->data()), |
317 | bPacket->dataSize()); |
318 | struct timeval presentationTime; // computed by: |
319 | Boolean hasBeenSyncedUsingRTCP; // computed by: |
320 | receptionStatsDB() |
321 | .noteIncomingPacket(rtpSSRC, rtpSeqNo, rtpTimestamp, |
322 | timestampFrequency(), |
323 | usableInJitterCalculation, presentationTime, |
324 | hasBeenSyncedUsingRTCP, bPacket->dataSize()); |
325 | |
326 | // Fill in the rest of the packet descriptor, and store it: |
327 | struct timeval timeNow; |
328 | gettimeofday(&timeNow, NULL); |
329 | bPacket->assignMiscParams(rtpSeqNo, rtpTimestamp, presentationTime, |
330 | hasBeenSyncedUsingRTCP, rtpMarkerBit, |
331 | timeNow); |
332 | if (!fReorderingBuffer->storePacket(bPacket)) break; |
333 | |
334 | readSuccess = True; |
335 | } while (0); |
336 | if (!readSuccess) fReorderingBuffer->freePacket(bPacket); |
337 | |
338 | doGetNextFrame1(); |
339 | // If we didn't get proper data this time, we'll get another chance |
340 | } |
341 | |
342 | |
343 | ////////// BufferedPacket and BufferedPacketFactory implementation ///// |
344 | |
345 | #define MAX_PACKET_SIZE 65536 |
346 | |
347 | BufferedPacket::BufferedPacket() |
348 | : fPacketSize(MAX_PACKET_SIZE), |
349 | fBuf(new unsigned char[MAX_PACKET_SIZE]), |
350 | fNextPacket(NULL) { |
351 | } |
352 | |
353 | BufferedPacket::~BufferedPacket() { |
354 | delete fNextPacket; |
355 | delete[] fBuf; |
356 | } |
357 | |
358 | void BufferedPacket::reset() { |
359 | fHead = fTail = 0; |
360 | fUseCount = 0; |
361 | fIsFirstPacket = False; // by default |
362 | } |
363 | |
364 | // The following function has been deprecated: |
365 | unsigned BufferedPacket |
366 | ::nextEnclosedFrameSize(unsigned char*& /*framePtr*/, unsigned dataSize) { |
367 | // By default, use the entire buffered data, even though it may consist |
368 | // of more than one frame, on the assumption that the client doesn't |
369 | // care. (This is more efficient than delivering a frame at a time) |
370 | return dataSize; |
371 | } |
372 | |
373 | void BufferedPacket |
374 | ::getNextEnclosedFrameParameters(unsigned char*& framePtr, unsigned dataSize, |
375 | unsigned& frameSize, |
376 | unsigned& frameDurationInMicroseconds) { |
377 | // By default, use the entire buffered data, even though it may consist |
378 | // of more than one frame, on the assumption that the client doesn't |
379 | // care. (This is more efficient than delivering a frame at a time) |
380 | |
381 | // For backwards-compatibility with existing uses of (the now deprecated) |
382 | // "nextEnclosedFrameSize()", call that function to implement this one: |
383 | frameSize = nextEnclosedFrameSize(framePtr, dataSize); |
384 | |
385 | frameDurationInMicroseconds = 0; // by default. Subclasses should correct this. |
386 | } |
387 | |
388 | Boolean BufferedPacket::fillInData(RTPInterface& rtpInterface, struct sockaddr_in& fromAddress, |
389 | Boolean& packetReadWasIncomplete) { |
390 | if (!packetReadWasIncomplete) reset(); |
391 | |
392 | unsigned const maxBytesToRead = bytesAvailable(); |
393 | if (maxBytesToRead == 0) return False; // exceeded buffer size when reading over TCP |
394 | |
395 | unsigned numBytesRead; |
396 | int tcpSocketNum; // not used |
397 | unsigned char tcpStreamChannelId; // not used |
398 | if (!rtpInterface.handleRead(&fBuf[fTail], maxBytesToRead, |
399 | numBytesRead, fromAddress, |
400 | tcpSocketNum, tcpStreamChannelId, |
401 | packetReadWasIncomplete)) { |
402 | return False; |
403 | } |
404 | fTail += numBytesRead; |
405 | return True; |
406 | } |
407 | |
408 | void BufferedPacket |
409 | ::assignMiscParams(unsigned short rtpSeqNo, unsigned rtpTimestamp, |
410 | struct timeval presentationTime, |
411 | Boolean hasBeenSyncedUsingRTCP, Boolean rtpMarkerBit, |
412 | struct timeval timeReceived) { |
413 | fRTPSeqNo = rtpSeqNo; |
414 | fRTPTimestamp = rtpTimestamp; |
415 | fPresentationTime = presentationTime; |
416 | fHasBeenSyncedUsingRTCP = hasBeenSyncedUsingRTCP; |
417 | fRTPMarkerBit = rtpMarkerBit; |
418 | fTimeReceived = timeReceived; |
419 | } |
420 | |
421 | void BufferedPacket::skip(unsigned numBytes) { |
422 | fHead += numBytes; |
423 | if (fHead > fTail) fHead = fTail; |
424 | } |
425 | |
426 | void BufferedPacket::removePadding(unsigned numBytes) { |
427 | if (numBytes > fTail-fHead) numBytes = fTail-fHead; |
428 | fTail -= numBytes; |
429 | } |
430 | |
431 | void BufferedPacket::appendData(unsigned char* newData, unsigned numBytes) { |
432 | if (numBytes > fPacketSize-fTail) numBytes = fPacketSize - fTail; |
433 | memmove(&fBuf[fTail], newData, numBytes); |
434 | fTail += numBytes; |
435 | } |
436 | |
437 | void BufferedPacket::use(unsigned char* to, unsigned toSize, |
438 | unsigned& bytesUsed, unsigned& bytesTruncated, |
439 | unsigned short& rtpSeqNo, unsigned& rtpTimestamp, |
440 | struct timeval& presentationTime, |
441 | Boolean& hasBeenSyncedUsingRTCP, |
442 | Boolean& rtpMarkerBit) { |
443 | unsigned char* origFramePtr = &fBuf[fHead]; |
444 | unsigned char* newFramePtr = origFramePtr; // may change in the call below |
445 | unsigned frameSize, frameDurationInMicroseconds; |
446 | getNextEnclosedFrameParameters(newFramePtr, fTail - fHead, |
447 | frameSize, frameDurationInMicroseconds); |
448 | if (frameSize > toSize) { |
449 | bytesTruncated += frameSize - toSize; |
450 | bytesUsed = toSize; |
451 | } else { |
452 | bytesTruncated = 0; |
453 | bytesUsed = frameSize; |
454 | } |
455 | |
456 | memmove(to, newFramePtr, bytesUsed); |
457 | fHead += (newFramePtr - origFramePtr) + frameSize; |
458 | ++fUseCount; |
459 | |
460 | rtpSeqNo = fRTPSeqNo; |
461 | rtpTimestamp = fRTPTimestamp; |
462 | presentationTime = fPresentationTime; |
463 | hasBeenSyncedUsingRTCP = fHasBeenSyncedUsingRTCP; |
464 | rtpMarkerBit = fRTPMarkerBit; |
465 | |
466 | // Update "fPresentationTime" for the next enclosed frame (if any): |
467 | fPresentationTime.tv_usec += frameDurationInMicroseconds; |
468 | if (fPresentationTime.tv_usec >= 1000000) { |
469 | fPresentationTime.tv_sec += fPresentationTime.tv_usec/1000000; |
470 | fPresentationTime.tv_usec = fPresentationTime.tv_usec%1000000; |
471 | } |
472 | } |
473 | |
474 | BufferedPacketFactory::BufferedPacketFactory() { |
475 | } |
476 | |
477 | BufferedPacketFactory::~BufferedPacketFactory() { |
478 | } |
479 | |
480 | BufferedPacket* BufferedPacketFactory |
481 | ::createNewPacket(MultiFramedRTPSource* /*ourSource*/) { |
482 | return new BufferedPacket; |
483 | } |
484 | |
485 | |
486 | ////////// ReorderingPacketBuffer implementation ////////// |
487 | |
488 | ReorderingPacketBuffer |
489 | ::ReorderingPacketBuffer(BufferedPacketFactory* packetFactory) |
490 | : fThresholdTime(100000) /* default reordering threshold: 100 ms */, |
491 | fHaveSeenFirstPacket(False), fHeadPacket(NULL), fTailPacket(NULL), fSavedPacket(NULL), fSavedPacketFree(True) { |
492 | fPacketFactory = (packetFactory == NULL) |
493 | ? (new BufferedPacketFactory) |
494 | : packetFactory; |
495 | } |
496 | |
497 | ReorderingPacketBuffer::~ReorderingPacketBuffer() { |
498 | reset(); |
499 | delete fPacketFactory; |
500 | } |
501 | |
502 | void ReorderingPacketBuffer::reset() { |
503 | if (fSavedPacketFree) delete fSavedPacket; // because fSavedPacket is not in the list |
504 | delete fHeadPacket; // will also delete fSavedPacket if it's in the list |
505 | resetHaveSeenFirstPacket(); |
506 | fHeadPacket = fTailPacket = fSavedPacket = NULL; |
507 | } |
508 | |
509 | BufferedPacket* ReorderingPacketBuffer::getFreePacket(MultiFramedRTPSource* ourSource) { |
510 | if (fSavedPacket == NULL) { // we're being called for the first time |
511 | fSavedPacket = fPacketFactory->createNewPacket(ourSource); |
512 | fSavedPacketFree = True; |
513 | } |
514 | |
515 | if (fSavedPacketFree == True) { |
516 | fSavedPacketFree = False; |
517 | return fSavedPacket; |
518 | } else { |
519 | return fPacketFactory->createNewPacket(ourSource); |
520 | } |
521 | } |
522 | |
523 | Boolean ReorderingPacketBuffer::storePacket(BufferedPacket* bPacket) { |
524 | unsigned short rtpSeqNo = bPacket->rtpSeqNo(); |
525 | |
526 | if (!fHaveSeenFirstPacket) { |
527 | fNextExpectedSeqNo = rtpSeqNo; // initialization |
528 | bPacket->isFirstPacket() = True; |
529 | fHaveSeenFirstPacket = True; |
530 | } |
531 | |
532 | // Ignore this packet if its sequence number is less than the one |
533 | // that we're looking for (in this case, it's been excessively delayed). |
534 | if (seqNumLT(rtpSeqNo, fNextExpectedSeqNo)) return False; |
535 | |
536 | if (fTailPacket == NULL) { |
537 | // Common case: There are no packets in the queue; this will be the first one: |
538 | bPacket->nextPacket() = NULL; |
539 | fHeadPacket = fTailPacket = bPacket; |
540 | return True; |
541 | } |
542 | |
543 | if (seqNumLT(fTailPacket->rtpSeqNo(), rtpSeqNo)) { |
544 | // The next-most common case: There are packets already in the queue; this packet arrived in order => put it at the tail: |
545 | bPacket->nextPacket() = NULL; |
546 | fTailPacket->nextPacket() = bPacket; |
547 | fTailPacket = bPacket; |
548 | return True; |
549 | } |
550 | |
551 | if (rtpSeqNo == fTailPacket->rtpSeqNo()) { |
552 | // This is a duplicate packet - ignore it |
553 | return False; |
554 | } |
555 | |
556 | // Rare case: This packet is out-of-order. Run through the list (from the head), to figure out where it belongs: |
557 | BufferedPacket* beforePtr = NULL; |
558 | BufferedPacket* afterPtr = fHeadPacket; |
559 | while (afterPtr != NULL) { |
560 | if (seqNumLT(rtpSeqNo, afterPtr->rtpSeqNo())) break; // it comes here |
561 | if (rtpSeqNo == afterPtr->rtpSeqNo()) { |
562 | // This is a duplicate packet - ignore it |
563 | return False; |
564 | } |
565 | |
566 | beforePtr = afterPtr; |
567 | afterPtr = afterPtr->nextPacket(); |
568 | } |
569 | |
570 | // Link our new packet between "beforePtr" and "afterPtr": |
571 | bPacket->nextPacket() = afterPtr; |
572 | if (beforePtr == NULL) { |
573 | fHeadPacket = bPacket; |
574 | } else { |
575 | beforePtr->nextPacket() = bPacket; |
576 | } |
577 | |
578 | return True; |
579 | } |
580 | |
581 | void ReorderingPacketBuffer::releaseUsedPacket(BufferedPacket* packet) { |
582 | // ASSERT: packet == fHeadPacket |
583 | // ASSERT: fNextExpectedSeqNo == packet->rtpSeqNo() |
584 | ++fNextExpectedSeqNo; // because we're finished with this packet now |
585 | |
586 | fHeadPacket = fHeadPacket->nextPacket(); |
587 | if (!fHeadPacket) { |
588 | fTailPacket = NULL; |
589 | } |
590 | packet->nextPacket() = NULL; |
591 | |
592 | freePacket(packet); |
593 | } |
594 | |
595 | BufferedPacket* ReorderingPacketBuffer |
596 | ::getNextCompletedPacket(Boolean& packetLossPreceded) { |
597 | if (fHeadPacket == NULL) return NULL; |
598 | |
599 | // Check whether the next packet we want is already at the head |
600 | // of the queue: |
601 | // ASSERT: fHeadPacket->rtpSeqNo() >= fNextExpectedSeqNo |
602 | if (fHeadPacket->rtpSeqNo() == fNextExpectedSeqNo) { |
603 | packetLossPreceded = fHeadPacket->isFirstPacket(); |
604 | // (The very first packet is treated as if there was packet loss beforehand.) |
605 | return fHeadPacket; |
606 | } |
607 | |
608 | // We're still waiting for our desired packet to arrive. However, if |
609 | // our time threshold has been exceeded, then forget it, and return |
610 | // the head packet instead: |
611 | Boolean timeThresholdHasBeenExceeded; |
612 | if (fThresholdTime == 0) { |
613 | timeThresholdHasBeenExceeded = True; // optimization |
614 | } else { |
615 | struct timeval timeNow; |
616 | gettimeofday(&timeNow, NULL); |
617 | unsigned uSecondsSinceReceived |
618 | = (timeNow.tv_sec - fHeadPacket->timeReceived().tv_sec)*1000000 |
619 | + (timeNow.tv_usec - fHeadPacket->timeReceived().tv_usec); |
620 | timeThresholdHasBeenExceeded = uSecondsSinceReceived > fThresholdTime; |
621 | } |
622 | if (timeThresholdHasBeenExceeded) { |
623 | fNextExpectedSeqNo = fHeadPacket->rtpSeqNo(); |
624 | // we've given up on earlier packets now |
625 | packetLossPreceded = True; |
626 | return fHeadPacket; |
627 | } |
628 | |
629 | // Otherwise, keep waiting for our desired packet to arrive: |
630 | return NULL; |
631 | } |
632 | |