1/**********
2This library is free software; you can redistribute it and/or modify it under
3the terms of the GNU Lesser General Public License as published by the
4Free Software Foundation; either version 3 of the License, or (at your
5option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
6
7This library is distributed in the hope that it will be useful, but WITHOUT
8ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
9FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
10more details.
11
12You should have received a copy of the GNU Lesser General Public License
13along with this library; if not, write to the Free Software Foundation, Inc.,
1451 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15**********/
16// "liveMedia"
17// Copyright (c) 1996-2020 Live Networks, Inc. All rights reserved.
18// A filter that passes through (unchanged) chunks that contain an integral number
19// of MPEG-2 Transport Stream packets, but returning (in "fDurationInMicroseconds")
20// an updated estimate of the time gap between chunks.
21// Implementation
22
23#include "MPEG2TransportStreamFramer.hh"
24#include <GroupsockHelper.hh> // for "gettimeofday()"
25
26#define TRANSPORT_PACKET_SIZE 188
27
28////////// Definitions of constants that control the behavior of this code /////////
29
30#if !defined(NEW_DURATION_WEIGHT)
31#define NEW_DURATION_WEIGHT 0.5
32 // How much weight to give to the latest duration measurement (must be <= 1)
33#endif
34
35#if !defined(TIME_ADJUSTMENT_FACTOR)
36#define TIME_ADJUSTMENT_FACTOR 0.8
37 // A factor by which to adjust the duration estimate to ensure that the overall
38 // packet transmission times remains matched with the PCR times (which will be the
39 // times that we expect receivers to play the incoming packets).
40 // (must be <= 1)
41#endif
42
43#if !defined(MAX_PLAYOUT_BUFFER_DURATION)
44#define MAX_PLAYOUT_BUFFER_DURATION 0.1 // (seconds)
45#endif
46
47#if !defined(PCR_PERIOD_VARIATION_RATIO)
48#define PCR_PERIOD_VARIATION_RATIO 0.5
49#endif
50
51////////// PIDStatus //////////
52
53class PIDStatus {
54public:
55 PIDStatus(double _firstClock, double _firstRealTime)
56 : firstClock(_firstClock), lastClock(_firstClock),
57 firstRealTime(_firstRealTime), lastRealTime(_firstRealTime),
58 lastPacketNum(0) {
59 }
60
61 double firstClock, lastClock, firstRealTime, lastRealTime;
62 u_int64_t lastPacketNum;
63};
64
65
66////////// MPEG2TransportStreamFramer //////////
67
68MPEG2TransportStreamFramer* MPEG2TransportStreamFramer
69::createNew(UsageEnvironment& env, FramedSource* inputSource) {
70 return new MPEG2TransportStreamFramer(env, inputSource);
71}
72
73MPEG2TransportStreamFramer
74::MPEG2TransportStreamFramer(UsageEnvironment& env, FramedSource* inputSource)
75 : FramedFilter(env, inputSource),
76 fTSPacketCount(0), fTSPacketDurationEstimate(0.0), fTSPCRCount(0),
77 fLimitNumTSPacketsToStream(False), fNumTSPacketsToStream(0),
78 fLimitTSPacketsToStreamByPCR(False), fPCRLimit(0.0) {
79 fPIDStatusTable = HashTable::create(ONE_WORD_HASH_KEYS);
80}
81
82MPEG2TransportStreamFramer::~MPEG2TransportStreamFramer() {
83 clearPIDStatusTable();
84 delete fPIDStatusTable;
85}
86
87void MPEG2TransportStreamFramer::clearPIDStatusTable() {
88 PIDStatus* pidStatus;
89 while ((pidStatus = (PIDStatus*)fPIDStatusTable->RemoveNext()) != NULL) {
90 delete pidStatus;
91 }
92}
93
94void MPEG2TransportStreamFramer::setNumTSPacketsToStream(unsigned long numTSRecordsToStream) {
95 fNumTSPacketsToStream = numTSRecordsToStream;
96 fLimitNumTSPacketsToStream = numTSRecordsToStream > 0;
97}
98
99void MPEG2TransportStreamFramer::setPCRLimit(float pcrLimit) {
100 fPCRLimit = pcrLimit;
101 fLimitTSPacketsToStreamByPCR = pcrLimit != 0.0;
102}
103
104void MPEG2TransportStreamFramer::doGetNextFrame() {
105 if (fLimitNumTSPacketsToStream) {
106 if (fNumTSPacketsToStream == 0) {
107 handleClosure();
108 return;
109 }
110 if (fNumTSPacketsToStream*TRANSPORT_PACKET_SIZE < fMaxSize) {
111 fMaxSize = fNumTSPacketsToStream*TRANSPORT_PACKET_SIZE;
112 }
113 }
114
115 // Read directly from our input source into our client's buffer:
116 fFrameSize = 0;
117 fInputSource->getNextFrame(fTo, fMaxSize,
118 afterGettingFrame, this,
119 FramedSource::handleClosure, this);
120}
121
122void MPEG2TransportStreamFramer::doStopGettingFrames() {
123 FramedFilter::doStopGettingFrames();
124 fTSPacketCount = 0;
125 fTSPCRCount = 0;
126
127 clearPIDStatusTable();
128}
129
130void MPEG2TransportStreamFramer
131::afterGettingFrame(void* clientData, unsigned frameSize,
132 unsigned /*numTruncatedBytes*/,
133 struct timeval presentationTime,
134 unsigned /*durationInMicroseconds*/) {
135 MPEG2TransportStreamFramer* framer = (MPEG2TransportStreamFramer*)clientData;
136 framer->afterGettingFrame1(frameSize, presentationTime);
137}
138
139#define TRANSPORT_SYNC_BYTE 0x47
140
141void MPEG2TransportStreamFramer::afterGettingFrame1(unsigned frameSize,
142 struct timeval presentationTime) {
143 fFrameSize += frameSize;
144 unsigned const numTSPackets = fFrameSize/TRANSPORT_PACKET_SIZE;
145 fNumTSPacketsToStream -= numTSPackets;
146 fFrameSize = numTSPackets*TRANSPORT_PACKET_SIZE; // an integral # of TS packets
147 if (fFrameSize == 0) {
148 // We didn't read a complete TS packet; assume that the input source has closed.
149 handleClosure();
150 return;
151 }
152
153 // Make sure the data begins with a sync byte:
154 unsigned syncBytePosition;
155 for (syncBytePosition = 0; syncBytePosition < fFrameSize; ++syncBytePosition) {
156 if (fTo[syncBytePosition] == TRANSPORT_SYNC_BYTE) break;
157 }
158 if (syncBytePosition == fFrameSize) {
159 envir() << "No Transport Stream sync byte in data.";
160 handleClosure();
161 return;
162 } else if (syncBytePosition > 0) {
163 // There's a sync byte, but not at the start of the data. Move the good data
164 // to the start of the buffer, then read more to fill it up again:
165 memmove(fTo, &fTo[syncBytePosition], fFrameSize - syncBytePosition);
166 fFrameSize -= syncBytePosition;
167 fInputSource->getNextFrame(&fTo[fFrameSize], syncBytePosition,
168 afterGettingFrame, this,
169 FramedSource::handleClosure, this);
170 return;
171 } // else normal case: the data begins with a sync byte
172
173 fPresentationTime = presentationTime;
174
175 // Scan through the TS packets that we read, and update our estimate of
176 // the duration of each packet:
177 struct timeval tvNow;
178 gettimeofday(&tvNow, NULL);
179 double timeNow = tvNow.tv_sec + tvNow.tv_usec/1000000.0;
180 for (unsigned i = 0; i < numTSPackets; ++i) {
181 if (!updateTSPacketDurationEstimate(&fTo[i*TRANSPORT_PACKET_SIZE], timeNow)) {
182 // We hit a preset limit (based on PCR) within the stream. Handle this as if the input source has closed:
183 handleClosure();
184 return;
185 }
186 }
187
188 fDurationInMicroseconds
189 = numTSPackets * (unsigned)(fTSPacketDurationEstimate*1000000);
190
191 // Complete the delivery to our client:
192 afterGetting(this);
193}
194
195Boolean MPEG2TransportStreamFramer::updateTSPacketDurationEstimate(unsigned char* pkt, double timeNow) {
196 // Sanity check: Make sure we start with the sync byte:
197 if (pkt[0] != TRANSPORT_SYNC_BYTE) {
198 envir() << "Missing sync byte!\n";
199 return True;
200 }
201
202 ++fTSPacketCount;
203
204 // If this packet doesn't contain a PCR, then we're not interested in it:
205 u_int8_t const adaptation_field_control = (pkt[3]&0x30)>>4;
206 if (adaptation_field_control != 2 && adaptation_field_control != 3) return True;
207 // there's no adaptation_field
208
209 u_int8_t const adaptation_field_length = pkt[4];
210 if (adaptation_field_length == 0) return True;
211
212 u_int8_t const discontinuity_indicator = pkt[5]&0x80;
213 u_int8_t const pcrFlag = pkt[5]&0x10;
214 if (pcrFlag == 0) return True; // no PCR
215
216 // There's a PCR. Get it, and the PID:
217 ++fTSPCRCount;
218 u_int32_t pcrBaseHigh = (pkt[6]<<24)|(pkt[7]<<16)|(pkt[8]<<8)|pkt[9];
219 double clock = pcrBaseHigh/45000.0;
220 if ((pkt[10]&0x80) != 0) clock += 1/90000.0; // add in low-bit (if set)
221 unsigned short pcrExt = ((pkt[10]&0x01)<<8) | pkt[11];
222 clock += pcrExt/27000000.0;
223 if (fLimitTSPacketsToStreamByPCR) {
224 if (clock > fPCRLimit) {
225 // We've hit a preset limit within the stream:
226 return False;
227 }
228 }
229
230 unsigned pid = ((pkt[1]&0x1F)<<8) | pkt[2];
231
232 // Check whether we already have a record of a PCR for this PID:
233 PIDStatus* pidStatus = (PIDStatus*)(fPIDStatusTable->Lookup((char*)pid));
234
235 if (pidStatus == NULL) {
236 // We're seeing this PID's PCR for the first time:
237 pidStatus = new PIDStatus(clock, timeNow);
238 fPIDStatusTable->Add((char*)pid, pidStatus);
239#ifdef DEBUG_PCR
240 fprintf(stderr, "PID 0x%x, FIRST PCR 0x%08x+%d:%03x == %f @ %f, pkt #%lu\n", pid, pcrBaseHigh, pkt[10]>>7, pcrExt, clock, timeNow, fTSPacketCount);
241#endif
242 } else {
243 // We've seen this PID's PCR before; update our per-packet duration estimate:
244 int64_t packetsSinceLast = (int64_t)(fTSPacketCount - pidStatus->lastPacketNum);
245 // it's "int64_t" because some compilers can't convert "u_int64_t" -> "double"
246 double durationPerPacket = (clock - pidStatus->lastClock)/packetsSinceLast;
247
248 // Hack (suggested by "Romain"): Don't update our estimate if this PCR appeared unusually quickly.
249 // (This can produce more accurate estimates for wildly VBR streams.)
250 double meanPCRPeriod = 0.0;
251 if (fTSPCRCount > 0) {
252 double tsPacketCount = (double)(int64_t)fTSPacketCount;
253 double tsPCRCount = (double)(int64_t)fTSPCRCount;
254 meanPCRPeriod = tsPacketCount/tsPCRCount;
255 if (packetsSinceLast < meanPCRPeriod*PCR_PERIOD_VARIATION_RATIO) return True;
256 }
257
258 if (fTSPacketDurationEstimate == 0.0) { // we've just started
259 fTSPacketDurationEstimate = durationPerPacket;
260 } else if (discontinuity_indicator == 0 && durationPerPacket >= 0.0) {
261 fTSPacketDurationEstimate
262 = durationPerPacket*NEW_DURATION_WEIGHT
263 + fTSPacketDurationEstimate*(1-NEW_DURATION_WEIGHT);
264
265 // Also adjust the duration estimate to try to ensure that the transmission
266 // rate matches the playout rate:
267 double transmitDuration = timeNow - pidStatus->firstRealTime;
268 double playoutDuration = clock - pidStatus->firstClock;
269 if (transmitDuration > playoutDuration) {
270 fTSPacketDurationEstimate *= TIME_ADJUSTMENT_FACTOR; // reduce estimate
271 } else if (transmitDuration + MAX_PLAYOUT_BUFFER_DURATION < playoutDuration) {
272 fTSPacketDurationEstimate /= TIME_ADJUSTMENT_FACTOR; // increase estimate
273 }
274 } else {
275 // the PCR has a discontinuity from its previous value; don't use it now,
276 // but reset our PCR and real-time values to compensate:
277 pidStatus->firstClock = clock;
278 pidStatus->firstRealTime = timeNow;
279 }
280#ifdef DEBUG_PCR
281 fprintf(stderr, "PID 0x%x, PCR 0x%08x+%d:%03x == %f @ %f (diffs %f @ %f), pkt #%lu, discon %d => this duration %f, new estimate %f, mean PCR period=%f\n", pid, pcrBaseHigh, pkt[10]>>7, pcrExt, clock, timeNow, clock - pidStatus->firstClock, timeNow - pidStatus->firstRealTime, fTSPacketCount, discontinuity_indicator != 0, durationPerPacket, fTSPacketDurationEstimate, meanPCRPeriod );
282#endif
283 }
284
285 pidStatus->lastClock = clock;
286 pidStatus->lastRealTime = timeNow;
287 pidStatus->lastPacketNum = fTSPacketCount;
288
289 return True;
290}
291