| 1 | /********** |
| 2 | This library is free software; you can redistribute it and/or modify it under |
| 3 | the terms of the GNU Lesser General Public License as published by the |
| 4 | Free Software Foundation; either version 3 of the License, or (at your |
| 5 | option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.) |
| 6 | |
| 7 | This library is distributed in the hope that it will be useful, but WITHOUT |
| 8 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| 9 | FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for |
| 10 | more details. |
| 11 | |
| 12 | You should have received a copy of the GNU Lesser General Public License |
| 13 | along with this library; if not, write to the Free Software Foundation, Inc., |
| 14 | 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
| 15 | **********/ |
| 16 | // "liveMedia" |
| 17 | // Copyright (c) 1996-2020 Live Networks, Inc. All rights reserved. |
| 18 | // A filter that passes through (unchanged) chunks that contain an integral number |
| 19 | // of MPEG-2 Transport Stream packets, but returning (in "fDurationInMicroseconds") |
| 20 | // an updated estimate of the time gap between chunks. |
| 21 | // Implementation |
| 22 | |
| 23 | #include "MPEG2TransportStreamFramer.hh" |
| 24 | #include <GroupsockHelper.hh> // for "gettimeofday()" |
| 25 | |
| 26 | #define TRANSPORT_PACKET_SIZE 188 |
| 27 | |
| 28 | ////////// Definitions of constants that control the behavior of this code ///////// |
| 29 | |
| 30 | #if !defined(NEW_DURATION_WEIGHT) |
| 31 | #define NEW_DURATION_WEIGHT 0.5 |
| 32 | // How much weight to give to the latest duration measurement (must be <= 1) |
| 33 | #endif |
| 34 | |
| 35 | #if !defined(TIME_ADJUSTMENT_FACTOR) |
| 36 | #define TIME_ADJUSTMENT_FACTOR 0.8 |
| 37 | // A factor by which to adjust the duration estimate to ensure that the overall |
| 38 | // packet transmission times remains matched with the PCR times (which will be the |
| 39 | // times that we expect receivers to play the incoming packets). |
| 40 | // (must be <= 1) |
| 41 | #endif |
| 42 | |
| 43 | #if !defined(MAX_PLAYOUT_BUFFER_DURATION) |
| 44 | #define MAX_PLAYOUT_BUFFER_DURATION 0.1 // (seconds) |
| 45 | #endif |
| 46 | |
| 47 | #if !defined(PCR_PERIOD_VARIATION_RATIO) |
| 48 | #define PCR_PERIOD_VARIATION_RATIO 0.5 |
| 49 | #endif |
| 50 | |
| 51 | ////////// PIDStatus ////////// |
| 52 | |
| 53 | class PIDStatus { |
| 54 | public: |
| 55 | PIDStatus(double _firstClock, double _firstRealTime) |
| 56 | : firstClock(_firstClock), lastClock(_firstClock), |
| 57 | firstRealTime(_firstRealTime), lastRealTime(_firstRealTime), |
| 58 | lastPacketNum(0) { |
| 59 | } |
| 60 | |
| 61 | double firstClock, lastClock, firstRealTime, lastRealTime; |
| 62 | u_int64_t lastPacketNum; |
| 63 | }; |
| 64 | |
| 65 | |
| 66 | ////////// MPEG2TransportStreamFramer ////////// |
| 67 | |
| 68 | MPEG2TransportStreamFramer* MPEG2TransportStreamFramer |
| 69 | ::createNew(UsageEnvironment& env, FramedSource* inputSource) { |
| 70 | return new MPEG2TransportStreamFramer(env, inputSource); |
| 71 | } |
| 72 | |
| 73 | MPEG2TransportStreamFramer |
| 74 | ::MPEG2TransportStreamFramer(UsageEnvironment& env, FramedSource* inputSource) |
| 75 | : FramedFilter(env, inputSource), |
| 76 | fTSPacketCount(0), fTSPacketDurationEstimate(0.0), fTSPCRCount(0), |
| 77 | fLimitNumTSPacketsToStream(False), fNumTSPacketsToStream(0), |
| 78 | fLimitTSPacketsToStreamByPCR(False), fPCRLimit(0.0) { |
| 79 | fPIDStatusTable = HashTable::create(ONE_WORD_HASH_KEYS); |
| 80 | } |
| 81 | |
| 82 | MPEG2TransportStreamFramer::~MPEG2TransportStreamFramer() { |
| 83 | clearPIDStatusTable(); |
| 84 | delete fPIDStatusTable; |
| 85 | } |
| 86 | |
| 87 | void MPEG2TransportStreamFramer::clearPIDStatusTable() { |
| 88 | PIDStatus* pidStatus; |
| 89 | while ((pidStatus = (PIDStatus*)fPIDStatusTable->RemoveNext()) != NULL) { |
| 90 | delete pidStatus; |
| 91 | } |
| 92 | } |
| 93 | |
| 94 | void MPEG2TransportStreamFramer::setNumTSPacketsToStream(unsigned long numTSRecordsToStream) { |
| 95 | fNumTSPacketsToStream = numTSRecordsToStream; |
| 96 | fLimitNumTSPacketsToStream = numTSRecordsToStream > 0; |
| 97 | } |
| 98 | |
| 99 | void MPEG2TransportStreamFramer::setPCRLimit(float pcrLimit) { |
| 100 | fPCRLimit = pcrLimit; |
| 101 | fLimitTSPacketsToStreamByPCR = pcrLimit != 0.0; |
| 102 | } |
| 103 | |
| 104 | void MPEG2TransportStreamFramer::doGetNextFrame() { |
| 105 | if (fLimitNumTSPacketsToStream) { |
| 106 | if (fNumTSPacketsToStream == 0) { |
| 107 | handleClosure(); |
| 108 | return; |
| 109 | } |
| 110 | if (fNumTSPacketsToStream*TRANSPORT_PACKET_SIZE < fMaxSize) { |
| 111 | fMaxSize = fNumTSPacketsToStream*TRANSPORT_PACKET_SIZE; |
| 112 | } |
| 113 | } |
| 114 | |
| 115 | // Read directly from our input source into our client's buffer: |
| 116 | fFrameSize = 0; |
| 117 | fInputSource->getNextFrame(fTo, fMaxSize, |
| 118 | afterGettingFrame, this, |
| 119 | FramedSource::handleClosure, this); |
| 120 | } |
| 121 | |
| 122 | void MPEG2TransportStreamFramer::doStopGettingFrames() { |
| 123 | FramedFilter::doStopGettingFrames(); |
| 124 | fTSPacketCount = 0; |
| 125 | fTSPCRCount = 0; |
| 126 | |
| 127 | clearPIDStatusTable(); |
| 128 | } |
| 129 | |
| 130 | void MPEG2TransportStreamFramer |
| 131 | ::afterGettingFrame(void* clientData, unsigned frameSize, |
| 132 | unsigned /*numTruncatedBytes*/, |
| 133 | struct timeval presentationTime, |
| 134 | unsigned /*durationInMicroseconds*/) { |
| 135 | MPEG2TransportStreamFramer* framer = (MPEG2TransportStreamFramer*)clientData; |
| 136 | framer->afterGettingFrame1(frameSize, presentationTime); |
| 137 | } |
| 138 | |
| 139 | #define TRANSPORT_SYNC_BYTE 0x47 |
| 140 | |
| 141 | void MPEG2TransportStreamFramer::afterGettingFrame1(unsigned frameSize, |
| 142 | struct timeval presentationTime) { |
| 143 | fFrameSize += frameSize; |
| 144 | unsigned const numTSPackets = fFrameSize/TRANSPORT_PACKET_SIZE; |
| 145 | fNumTSPacketsToStream -= numTSPackets; |
| 146 | fFrameSize = numTSPackets*TRANSPORT_PACKET_SIZE; // an integral # of TS packets |
| 147 | if (fFrameSize == 0) { |
| 148 | // We didn't read a complete TS packet; assume that the input source has closed. |
| 149 | handleClosure(); |
| 150 | return; |
| 151 | } |
| 152 | |
| 153 | // Make sure the data begins with a sync byte: |
| 154 | unsigned syncBytePosition; |
| 155 | for (syncBytePosition = 0; syncBytePosition < fFrameSize; ++syncBytePosition) { |
| 156 | if (fTo[syncBytePosition] == TRANSPORT_SYNC_BYTE) break; |
| 157 | } |
| 158 | if (syncBytePosition == fFrameSize) { |
| 159 | envir() << "No Transport Stream sync byte in data." ; |
| 160 | handleClosure(); |
| 161 | return; |
| 162 | } else if (syncBytePosition > 0) { |
| 163 | // There's a sync byte, but not at the start of the data. Move the good data |
| 164 | // to the start of the buffer, then read more to fill it up again: |
| 165 | memmove(fTo, &fTo[syncBytePosition], fFrameSize - syncBytePosition); |
| 166 | fFrameSize -= syncBytePosition; |
| 167 | fInputSource->getNextFrame(&fTo[fFrameSize], syncBytePosition, |
| 168 | afterGettingFrame, this, |
| 169 | FramedSource::handleClosure, this); |
| 170 | return; |
| 171 | } // else normal case: the data begins with a sync byte |
| 172 | |
| 173 | fPresentationTime = presentationTime; |
| 174 | |
| 175 | // Scan through the TS packets that we read, and update our estimate of |
| 176 | // the duration of each packet: |
| 177 | struct timeval tvNow; |
| 178 | gettimeofday(&tvNow, NULL); |
| 179 | double timeNow = tvNow.tv_sec + tvNow.tv_usec/1000000.0; |
| 180 | for (unsigned i = 0; i < numTSPackets; ++i) { |
| 181 | if (!updateTSPacketDurationEstimate(&fTo[i*TRANSPORT_PACKET_SIZE], timeNow)) { |
| 182 | // We hit a preset limit (based on PCR) within the stream. Handle this as if the input source has closed: |
| 183 | handleClosure(); |
| 184 | return; |
| 185 | } |
| 186 | } |
| 187 | |
| 188 | fDurationInMicroseconds |
| 189 | = numTSPackets * (unsigned)(fTSPacketDurationEstimate*1000000); |
| 190 | |
| 191 | // Complete the delivery to our client: |
| 192 | afterGetting(this); |
| 193 | } |
| 194 | |
| 195 | Boolean MPEG2TransportStreamFramer::updateTSPacketDurationEstimate(unsigned char* pkt, double timeNow) { |
| 196 | // Sanity check: Make sure we start with the sync byte: |
| 197 | if (pkt[0] != TRANSPORT_SYNC_BYTE) { |
| 198 | envir() << "Missing sync byte!\n" ; |
| 199 | return True; |
| 200 | } |
| 201 | |
| 202 | ++fTSPacketCount; |
| 203 | |
| 204 | // If this packet doesn't contain a PCR, then we're not interested in it: |
| 205 | u_int8_t const adaptation_field_control = (pkt[3]&0x30)>>4; |
| 206 | if (adaptation_field_control != 2 && adaptation_field_control != 3) return True; |
| 207 | // there's no adaptation_field |
| 208 | |
| 209 | u_int8_t const adaptation_field_length = pkt[4]; |
| 210 | if (adaptation_field_length == 0) return True; |
| 211 | |
| 212 | u_int8_t const discontinuity_indicator = pkt[5]&0x80; |
| 213 | u_int8_t const pcrFlag = pkt[5]&0x10; |
| 214 | if (pcrFlag == 0) return True; // no PCR |
| 215 | |
| 216 | // There's a PCR. Get it, and the PID: |
| 217 | ++fTSPCRCount; |
| 218 | u_int32_t pcrBaseHigh = (pkt[6]<<24)|(pkt[7]<<16)|(pkt[8]<<8)|pkt[9]; |
| 219 | double clock = pcrBaseHigh/45000.0; |
| 220 | if ((pkt[10]&0x80) != 0) clock += 1/90000.0; // add in low-bit (if set) |
| 221 | unsigned short pcrExt = ((pkt[10]&0x01)<<8) | pkt[11]; |
| 222 | clock += pcrExt/27000000.0; |
| 223 | if (fLimitTSPacketsToStreamByPCR) { |
| 224 | if (clock > fPCRLimit) { |
| 225 | // We've hit a preset limit within the stream: |
| 226 | return False; |
| 227 | } |
| 228 | } |
| 229 | |
| 230 | unsigned pid = ((pkt[1]&0x1F)<<8) | pkt[2]; |
| 231 | |
| 232 | // Check whether we already have a record of a PCR for this PID: |
| 233 | PIDStatus* pidStatus = (PIDStatus*)(fPIDStatusTable->Lookup((char*)pid)); |
| 234 | |
| 235 | if (pidStatus == NULL) { |
| 236 | // We're seeing this PID's PCR for the first time: |
| 237 | pidStatus = new PIDStatus(clock, timeNow); |
| 238 | fPIDStatusTable->Add((char*)pid, pidStatus); |
| 239 | #ifdef DEBUG_PCR |
| 240 | fprintf(stderr, "PID 0x%x, FIRST PCR 0x%08x+%d:%03x == %f @ %f, pkt #%lu\n" , pid, pcrBaseHigh, pkt[10]>>7, pcrExt, clock, timeNow, fTSPacketCount); |
| 241 | #endif |
| 242 | } else { |
| 243 | // We've seen this PID's PCR before; update our per-packet duration estimate: |
| 244 | int64_t packetsSinceLast = (int64_t)(fTSPacketCount - pidStatus->lastPacketNum); |
| 245 | // it's "int64_t" because some compilers can't convert "u_int64_t" -> "double" |
| 246 | double durationPerPacket = (clock - pidStatus->lastClock)/packetsSinceLast; |
| 247 | |
| 248 | // Hack (suggested by "Romain"): Don't update our estimate if this PCR appeared unusually quickly. |
| 249 | // (This can produce more accurate estimates for wildly VBR streams.) |
| 250 | double meanPCRPeriod = 0.0; |
| 251 | if (fTSPCRCount > 0) { |
| 252 | double tsPacketCount = (double)(int64_t)fTSPacketCount; |
| 253 | double tsPCRCount = (double)(int64_t)fTSPCRCount; |
| 254 | meanPCRPeriod = tsPacketCount/tsPCRCount; |
| 255 | if (packetsSinceLast < meanPCRPeriod*PCR_PERIOD_VARIATION_RATIO) return True; |
| 256 | } |
| 257 | |
| 258 | if (fTSPacketDurationEstimate == 0.0) { // we've just started |
| 259 | fTSPacketDurationEstimate = durationPerPacket; |
| 260 | } else if (discontinuity_indicator == 0 && durationPerPacket >= 0.0) { |
| 261 | fTSPacketDurationEstimate |
| 262 | = durationPerPacket*NEW_DURATION_WEIGHT |
| 263 | + fTSPacketDurationEstimate*(1-NEW_DURATION_WEIGHT); |
| 264 | |
| 265 | // Also adjust the duration estimate to try to ensure that the transmission |
| 266 | // rate matches the playout rate: |
| 267 | double transmitDuration = timeNow - pidStatus->firstRealTime; |
| 268 | double playoutDuration = clock - pidStatus->firstClock; |
| 269 | if (transmitDuration > playoutDuration) { |
| 270 | fTSPacketDurationEstimate *= TIME_ADJUSTMENT_FACTOR; // reduce estimate |
| 271 | } else if (transmitDuration + MAX_PLAYOUT_BUFFER_DURATION < playoutDuration) { |
| 272 | fTSPacketDurationEstimate /= TIME_ADJUSTMENT_FACTOR; // increase estimate |
| 273 | } |
| 274 | } else { |
| 275 | // the PCR has a discontinuity from its previous value; don't use it now, |
| 276 | // but reset our PCR and real-time values to compensate: |
| 277 | pidStatus->firstClock = clock; |
| 278 | pidStatus->firstRealTime = timeNow; |
| 279 | } |
| 280 | #ifdef DEBUG_PCR |
| 281 | fprintf(stderr, "PID 0x%x, PCR 0x%08x+%d:%03x == %f @ %f (diffs %f @ %f), pkt #%lu, discon %d => this duration %f, new estimate %f, mean PCR period=%f\n" , pid, pcrBaseHigh, pkt[10]>>7, pcrExt, clock, timeNow, clock - pidStatus->firstClock, timeNow - pidStatus->firstRealTime, fTSPacketCount, discontinuity_indicator != 0, durationPerPacket, fTSPacketDurationEstimate, meanPCRPeriod ); |
| 282 | #endif |
| 283 | } |
| 284 | |
| 285 | pidStatus->lastClock = clock; |
| 286 | pidStatus->lastRealTime = timeNow; |
| 287 | pidStatus->lastPacketNum = fTSPacketCount; |
| 288 | |
| 289 | return True; |
| 290 | } |
| 291 | |