1/**********
2This library is free software; you can redistribute it and/or modify it under
3the terms of the GNU Lesser General Public License as published by the
4Free Software Foundation; either version 3 of the License, or (at your
5option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
6
7This library is distributed in the hope that it will be useful, but WITHOUT
8ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
9FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
10more details.
11
12You should have received a copy of the GNU Lesser General Public License
13along with this library; if not, write to the Free Software Foundation, Inc.,
1451 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15**********/
16// "liveMedia"
17// Copyright (c) 1996-2020 Live Networks, Inc. All rights reserved.
18// A subclass of "ServerMediaSession" that can be used to create a (unicast) RTSP servers that acts as a 'proxy' for
19// another (unicast or multicast) RTSP/RTP stream.
20// Implementation
21
22#include "liveMedia.hh"
23#include "RTSPCommon.hh"
24#include "GroupsockHelper.hh" // for "our_random()"
25
26#ifndef MILLION
27#define MILLION 1000000
28#endif
29
30// A "OnDemandServerMediaSubsession" subclass, used to implement a unicast RTSP server that's proxying another RTSP stream:
31
32class ProxyServerMediaSubsession: public OnDemandServerMediaSubsession {
33public:
34 ProxyServerMediaSubsession(MediaSubsession& mediaSubsession,
35 portNumBits initialPortNum, Boolean multiplexRTCPWithRTP);
36 virtual ~ProxyServerMediaSubsession();
37
38 char const* codecName() const { return fCodecName; }
39 char const* url() const { return ((ProxyServerMediaSession*)fParentSession)->url(); }
40
41private: // redefined virtual functions
42 virtual FramedSource* createNewStreamSource(unsigned clientSessionId,
43 unsigned& estBitrate);
44 virtual void closeStreamSource(FramedSource *inputSource);
45 virtual RTPSink* createNewRTPSink(Groupsock* rtpGroupsock,
46 unsigned char rtpPayloadTypeIfDynamic,
47 FramedSource* inputSource);
48 virtual Groupsock* createGroupsock(struct in_addr const& addr, Port port);
49 virtual RTCPInstance* createRTCP(Groupsock* RTCPgs, unsigned totSessionBW, /* in kbps */
50 unsigned char const* cname, RTPSink* sink);
51
52private:
53 static void subsessionByeHandler(void* clientData);
54 void subsessionByeHandler();
55
56 int verbosityLevel() const { return ((ProxyServerMediaSession*)fParentSession)->fVerbosityLevel; }
57
58private:
59 friend class ProxyRTSPClient;
60 MediaSubsession& fClientMediaSubsession; // the 'client' media subsession object that corresponds to this 'server' media subsession
61 char const* fCodecName; // copied from "fClientMediaSubsession" once it's been set up
62 ProxyServerMediaSubsession* fNext; // used when we're part of a queue
63 Boolean fHaveSetupStream;
64};
65
66
67////////// ProxyServerMediaSession implementation //////////
68
69UsageEnvironment& operator<<(UsageEnvironment& env, const ProxyServerMediaSession& psms) { // used for debugging
70 return env << "ProxyServerMediaSession[" << psms.url() << "]";
71}
72
73ProxyRTSPClient*
74defaultCreateNewProxyRTSPClientFunc(ProxyServerMediaSession& ourServerMediaSession,
75 char const* rtspURL,
76 char const* username, char const* password,
77 portNumBits tunnelOverHTTPPortNum, int verbosityLevel,
78 int socketNumToServer) {
79 return new ProxyRTSPClient(ourServerMediaSession, rtspURL, username, password,
80 tunnelOverHTTPPortNum, verbosityLevel, socketNumToServer);
81}
82
83ProxyServerMediaSession* ProxyServerMediaSession
84::createNew(UsageEnvironment& env, GenericMediaServer* ourMediaServer,
85 char const* inputStreamURL, char const* streamName,
86 char const* username, char const* password,
87 portNumBits tunnelOverHTTPPortNum, int verbosityLevel, int socketNumToServer,
88 MediaTranscodingTable* transcodingTable) {
89 return new ProxyServerMediaSession(env, ourMediaServer, inputStreamURL, streamName, username, password,
90 tunnelOverHTTPPortNum, verbosityLevel, socketNumToServer,
91 transcodingTable);
92}
93
94
95ProxyServerMediaSession
96::ProxyServerMediaSession(UsageEnvironment& env, GenericMediaServer* ourMediaServer,
97 char const* inputStreamURL, char const* streamName,
98 char const* username, char const* password,
99 portNumBits tunnelOverHTTPPortNum, int verbosityLevel,
100 int socketNumToServer,
101 MediaTranscodingTable* transcodingTable,
102 createNewProxyRTSPClientFunc* ourCreateNewProxyRTSPClientFunc,
103 portNumBits initialPortNum, Boolean multiplexRTCPWithRTP)
104 : ServerMediaSession(env, streamName, NULL, NULL, False, NULL),
105 describeCompletedFlag(0), fOurMediaServer(ourMediaServer), fClientMediaSession(NULL),
106 fVerbosityLevel(verbosityLevel),
107 fPresentationTimeSessionNormalizer(new PresentationTimeSessionNormalizer(envir())),
108 fCreateNewProxyRTSPClientFunc(ourCreateNewProxyRTSPClientFunc),
109 fTranscodingTable(transcodingTable),
110 fInitialPortNum(initialPortNum), fMultiplexRTCPWithRTP(multiplexRTCPWithRTP) {
111 // Open a RTSP connection to the input stream, and send a "DESCRIBE" command.
112 // We'll use the SDP description in the response to set ourselves up.
113 fProxyRTSPClient
114 = (*fCreateNewProxyRTSPClientFunc)(*this, inputStreamURL, username, password,
115 tunnelOverHTTPPortNum,
116 verbosityLevel > 0 ? verbosityLevel-1 : verbosityLevel,
117 socketNumToServer);
118 ProxyRTSPClient::sendDESCRIBE(fProxyRTSPClient);
119}
120
121ProxyServerMediaSession::~ProxyServerMediaSession() {
122 if (fVerbosityLevel > 0) {
123 envir() << *this << "::~ProxyServerMediaSession()\n";
124 }
125
126 // Begin by sending a "TEARDOWN" command (without checking for a response):
127 if (fProxyRTSPClient != NULL && fClientMediaSession != NULL) {
128 fProxyRTSPClient->sendTeardownCommand(*fClientMediaSession, NULL, fProxyRTSPClient->auth());
129 }
130
131 // Then delete our state:
132 Medium::close(fClientMediaSession);
133 Medium::close(fProxyRTSPClient);
134 Medium::close(fPresentationTimeSessionNormalizer);
135}
136
137char const* ProxyServerMediaSession::url() const {
138 return fProxyRTSPClient == NULL ? NULL : fProxyRTSPClient->url();
139}
140
141Groupsock* ProxyServerMediaSession::createGroupsock(struct in_addr const& addr, Port port) {
142 // Default implementation; may be redefined by subclasses:
143 return new Groupsock(envir(), addr, port, 255);
144}
145
146RTCPInstance* ProxyServerMediaSession
147::createRTCP(Groupsock* RTCPgs, unsigned totSessionBW, /* in kbps */
148 unsigned char const* cname, RTPSink* sink) {
149 // Default implementation; may be redefined by subclasses:
150 return RTCPInstance::createNew(envir(), RTCPgs, totSessionBW, cname, sink, NULL/*we're a server*/);
151}
152
153Boolean ProxyServerMediaSession::allowProxyingForSubsession(MediaSubsession const& /*mss*/) {
154 // Default implementation
155 return True;
156}
157
158void ProxyServerMediaSession::continueAfterDESCRIBE(char const* sdpDescription) {
159 describeCompletedFlag = 1;
160
161 // Create a (client) "MediaSession" object from the stream's SDP description ("resultString"), then iterate through its
162 // "MediaSubsession" objects, to set up corresponding "ServerMediaSubsession" objects that we'll use to serve the stream's tracks.
163 do {
164 fClientMediaSession = MediaSession::createNew(envir(), sdpDescription);
165 if (fClientMediaSession == NULL) break;
166
167 MediaSubsessionIterator iter(*fClientMediaSession);
168 for (MediaSubsession* mss = iter.next(); mss != NULL; mss = iter.next()) {
169 if (!allowProxyingForSubsession(*mss)) continue;
170
171 ServerMediaSubsession* smss
172 = new ProxyServerMediaSubsession(*mss, fInitialPortNum, fMultiplexRTCPWithRTP);
173 addSubsession(smss);
174 if (fVerbosityLevel > 0) {
175 envir() << *this << " added new \"ProxyServerMediaSubsession\" for "
176 << mss->protocolName() << "/" << mss->mediumName() << "/" << mss->codecName() << " track\n";
177 }
178 }
179 } while (0);
180}
181
182void ProxyServerMediaSession::resetDESCRIBEState() {
183 // Delete all of our "ProxyServerMediaSubsession"s; they'll get set up again once we get a response to the new "DESCRIBE".
184 if (fOurMediaServer != NULL) {
185 // First, close any client connections that may have already been set up:
186 fOurMediaServer->closeAllClientSessionsForServerMediaSession(this);
187 }
188 deleteAllSubsessions();
189
190 // Finally, delete the client "MediaSession" object that we had set up after receiving the response to the previous "DESCRIBE":
191 Medium::close(fClientMediaSession); fClientMediaSession = NULL;
192}
193
194///////// RTSP 'response handlers' //////////
195
196static void continueAfterDESCRIBE(RTSPClient* rtspClient, int resultCode, char* resultString) {
197 char const* res;
198
199 if (resultCode == 0) {
200 // The "DESCRIBE" command succeeded, so "resultString" should be the stream's SDP description.
201 res = resultString;
202 } else {
203 // The "DESCRIBE" command failed.
204 res = NULL;
205 }
206 ((ProxyRTSPClient*)rtspClient)->continueAfterDESCRIBE(res);
207 delete[] resultString;
208}
209
210static void continueAfterSETUP(RTSPClient* rtspClient, int resultCode, char* resultString) {
211 ((ProxyRTSPClient*)rtspClient)->continueAfterSETUP(resultCode);
212 delete[] resultString;
213}
214
215static void continueAfterPLAY(RTSPClient* rtspClient, int resultCode, char* resultString) {
216 ((ProxyRTSPClient*)rtspClient)->continueAfterPLAY(resultCode);
217 delete[] resultString;
218}
219
220static void continueAfterOPTIONS(RTSPClient* rtspClient, int resultCode, char* resultString) {
221 Boolean serverSupportsGetParameter = False;
222 if (resultCode == 0) {
223 // Note whether the server told us that it supports the "GET_PARAMETER" command:
224 serverSupportsGetParameter = RTSPOptionIsSupported("GET_PARAMETER", resultString);
225 }
226 ((ProxyRTSPClient*)rtspClient)->continueAfterLivenessCommand(resultCode, serverSupportsGetParameter);
227 delete[] resultString;
228}
229
230#ifdef SEND_GET_PARAMETER_IF_SUPPORTED
231static void continueAfterGET_PARAMETER(RTSPClient* rtspClient, int resultCode, char* resultString) {
232 ((ProxyRTSPClient*)rtspClient)->continueAfterLivenessCommand(resultCode, True);
233 delete[] resultString;
234}
235#endif
236
237
238////////// "ProxyRTSPClient" implementation /////////
239
240UsageEnvironment& operator<<(UsageEnvironment& env, const ProxyRTSPClient& proxyRTSPClient) { // used for debugging
241 return env << "ProxyRTSPClient[" << proxyRTSPClient.url() << "]";
242}
243
244ProxyRTSPClient::ProxyRTSPClient(ProxyServerMediaSession& ourServerMediaSession, char const* rtspURL,
245 char const* username, char const* password,
246 portNumBits tunnelOverHTTPPortNum, int verbosityLevel, int socketNumToServer)
247 : RTSPClient(ourServerMediaSession.envir(), rtspURL, verbosityLevel, "ProxyRTSPClient",
248 tunnelOverHTTPPortNum == (portNumBits)(~0) ? 0 : tunnelOverHTTPPortNum, socketNumToServer),
249 fOurServerMediaSession(ourServerMediaSession), fOurURL(strDup(rtspURL)), fStreamRTPOverTCP(tunnelOverHTTPPortNum != 0),
250 fSetupQueueHead(NULL), fSetupQueueTail(NULL), fNumSetupsDone(0), fNextDESCRIBEDelay(1),
251 fServerSupportsGetParameter(False), fLastCommandWasPLAY(False), fDoneDESCRIBE(False),
252 fLivenessCommandTask(NULL), fDESCRIBECommandTask(NULL), fSubsessionTimerTask(NULL), fResetTask(NULL) {
253 if (username != NULL && password != NULL) {
254 fOurAuthenticator = new Authenticator(username, password);
255 } else {
256 fOurAuthenticator = NULL;
257 }
258}
259
260void ProxyRTSPClient::reset() {
261 envir().taskScheduler().unscheduleDelayedTask(fLivenessCommandTask); fLivenessCommandTask = NULL;
262 envir().taskScheduler().unscheduleDelayedTask(fDESCRIBECommandTask); fDESCRIBECommandTask = NULL;
263 envir().taskScheduler().unscheduleDelayedTask(fSubsessionTimerTask); fSubsessionTimerTask = NULL;
264 envir().taskScheduler().unscheduleDelayedTask(fResetTask); fResetTask = NULL;
265
266 fSetupQueueHead = fSetupQueueTail = NULL;
267 fNumSetupsDone = 0;
268 fNextDESCRIBEDelay = 1;
269 fLastCommandWasPLAY = False;
270 fDoneDESCRIBE = False;
271
272 RTSPClient::reset();
273}
274
275ProxyRTSPClient::~ProxyRTSPClient() {
276 reset();
277
278 delete fOurAuthenticator;
279 delete[] fOurURL;
280}
281
282int ProxyRTSPClient::connectToServer(int socketNum, portNumBits remotePortNum) {
283 int res;
284 res = RTSPClient::connectToServer(socketNum, remotePortNum);
285
286 if (res == 0 && fDoneDESCRIBE && fStreamRTPOverTCP) {
287 if (fVerbosityLevel > 0) {
288 envir() << "ProxyRTSPClient::connectToServer calling scheduleReset()\n";
289 }
290 scheduleReset();
291 }
292
293 return res;
294}
295
296void ProxyRTSPClient::continueAfterDESCRIBE(char const* sdpDescription) {
297 if (sdpDescription != NULL) {
298 fOurServerMediaSession.continueAfterDESCRIBE(sdpDescription);
299
300 // Unlike most RTSP streams, there might be a long delay between this "DESCRIBE" command (to the downstream server) and the
301 // subsequent "SETUP"/"PLAY" - which doesn't occur until the first time that a client requests the stream.
302 // To prevent the proxied connection (between us and the downstream server) from timing out, we send periodic 'liveness'
303 // ("OPTIONS" or "GET_PARAMETER") commands. (The usual RTCP liveness mechanism wouldn't work here, because RTCP packets
304 // don't get sent until after the "PLAY" command.)
305 scheduleLivenessCommand();
306 } else {
307 // The "DESCRIBE" command failed, most likely because the server or the stream is not yet running.
308 // Reschedule another "DESCRIBE" command to take place later:
309 scheduleDESCRIBECommand();
310 }
311 fDoneDESCRIBE = True;
312}
313
314void ProxyRTSPClient::continueAfterLivenessCommand(int resultCode, Boolean serverSupportsGetParameter) {
315 if (resultCode != 0) {
316 // The periodic 'liveness' command failed, suggesting that the back-end stream is no longer alive.
317 // We handle this by resetting our connection state with this server. Any current clients will be closed, but
318 // subsequent clients will cause new RTSP "SETUP"s and "PLAY"s to get done, restarting the stream.
319 // Then continue by sending more "DESCRIBE" commands, to try to restore the stream.
320
321 fServerSupportsGetParameter = False; // until we learn otherwise, in response to a future "OPTIONS" command
322
323 if (resultCode < 0) {
324 // The 'liveness' command failed without getting a response from the server (otherwise "resultCode" would have been > 0).
325 // This suggests that the RTSP connection itself has failed. Print this error code, in case it's useful for debugging:
326 if (fVerbosityLevel > 0) {
327 envir() << *this << ": lost connection to server ('errno': " << -resultCode << "). Scheduling reset...\n";
328 }
329 }
330
331 scheduleReset();
332 return;
333 }
334
335 fServerSupportsGetParameter = serverSupportsGetParameter;
336
337 // Schedule the next 'liveness' command (i.e., to tell the back-end server that we're still alive):
338 scheduleLivenessCommand();
339}
340
341#define SUBSESSION_TIMEOUT_SECONDS 5 // how many seconds to wait for the last track's "SETUP" to be done (note below)
342
343void ProxyRTSPClient::continueAfterSETUP(int resultCode) {
344 if (resultCode != 0) {
345 // The "SETUP" command failed, so arrange to reset the state. (We don't do this now, because it deletes the
346 // "ProxyServerMediaSubsession", and we can't do that during "ProxyServerMediaSubsession::createNewStreamSource()".)
347 scheduleReset();
348 return;
349 }
350
351 if (fVerbosityLevel > 0) {
352 envir() << *this << "::continueAfterSETUP(): head codec: " << fSetupQueueHead->codecName()
353 << "; numSubsessions " << fSetupQueueHead->fParentSession->numSubsessions() << "\n\tqueue:";
354 for (ProxyServerMediaSubsession* p = fSetupQueueHead; p != NULL; p = p->fNext) {
355 envir() << "\t" << p->codecName();
356 }
357 envir() << "\n";
358 }
359 envir().taskScheduler().unscheduleDelayedTask(fSubsessionTimerTask); // in case it had been set
360
361 // Dequeue the first "ProxyServerMediaSubsession" from our 'SETUP queue'. It will be the one for which this "SETUP" was done:
362 ProxyServerMediaSubsession* smss = fSetupQueueHead; // Assert: != NULL
363 fSetupQueueHead = fSetupQueueHead->fNext;
364 if (fSetupQueueHead == NULL) fSetupQueueTail = NULL;
365
366 if (fSetupQueueHead != NULL) {
367 // There are still entries in the queue, for tracks for which we have still to do a "SETUP".
368 // "SETUP" the first of these now:
369 sendSetupCommand(fSetupQueueHead->fClientMediaSubsession, ::continueAfterSETUP,
370 False, fStreamRTPOverTCP, False, fOurAuthenticator);
371 ++fNumSetupsDone;
372 fSetupQueueHead->fHaveSetupStream = True;
373 } else {
374 if (fNumSetupsDone >= smss->fParentSession->numSubsessions()) {
375 // We've now finished setting up each of our subsessions (i.e., 'tracks').
376 // Continue by sending a "PLAY" command (an 'aggregate' "PLAY" command, on the whole session):
377 sendPlayCommand(smss->fClientMediaSubsession.parentSession(), ::continueAfterPLAY, -1.0f, -1.0f, 1.0f, fOurAuthenticator);
378 // the "-1.0f" "start" parameter causes the "PLAY" to be sent without a "Range:" header, in case we'd already done
379 // a "PLAY" before (as a result of a 'subsession timeout' (note below))
380 fLastCommandWasPLAY = True;
381 } else {
382 // Some of this session's subsessions (i.e., 'tracks') remain to be "SETUP". They might get "SETUP" very soon, but it's
383 // also possible - if the remote client chose to play only some of the session's tracks - that they might not.
384 // To allow for this possibility, we set a timer. If the timer expires without the remaining subsessions getting "SETUP",
385 // then we send a "PLAY" command anyway:
386 fSubsessionTimerTask
387 = envir().taskScheduler().scheduleDelayedTask(SUBSESSION_TIMEOUT_SECONDS*MILLION, (TaskFunc*)subsessionTimeout, this);
388 }
389 }
390}
391
392void ProxyRTSPClient::continueAfterPLAY(int resultCode) {
393 if (resultCode != 0) {
394 // The "PLAY" command failed, so arrange to reset the state. (We don't do this now, because it deletes the
395 // "ProxyServerMediaSubsession", and we can't do that during "ProxyServerMediaSubsession::createNewStreamSource()".)
396 scheduleReset();
397 return;
398 }
399}
400
401void ProxyRTSPClient::scheduleLivenessCommand() {
402 // Delay a random time before sending another 'liveness' command.
403 unsigned delayMax = sessionTimeoutParameter(); // if the server specified a maximum time between 'liveness' probes, then use that
404 if (delayMax == 0) {
405 delayMax = 60;
406 }
407
408 // Choose a random time from [delayMax/2,delayMax-1) seconds:
409 unsigned const us_1stPart = delayMax*500000;
410 unsigned uSecondsToDelay;
411 if (us_1stPart <= 1000000) {
412 uSecondsToDelay = us_1stPart;
413 } else {
414 unsigned const us_2ndPart = us_1stPart-1000000;
415 uSecondsToDelay = us_1stPart + (us_2ndPart*our_random())%us_2ndPart;
416 }
417 fLivenessCommandTask = envir().taskScheduler().scheduleDelayedTask(uSecondsToDelay, sendLivenessCommand, this);
418}
419
420void ProxyRTSPClient::sendLivenessCommand(void* clientData) {
421 ProxyRTSPClient* rtspClient = (ProxyRTSPClient*)clientData;
422
423 // Note. By default, we do not send "GET_PARAMETER" as our 'liveness notification' command, even if the server previously
424 // indicated (in its response to our earlier "OPTIONS" command) that it supported "GET_PARAMETER". This is because
425 // "GET_PARAMETER" crashes some camera servers (even though they claimed to support "GET_PARAMETER").
426#ifdef SEND_GET_PARAMETER_IF_SUPPORTED
427 MediaSession* sess = rtspClient->fOurServerMediaSession.fClientMediaSession;
428
429 if (rtspClient->fServerSupportsGetParameter && rtspClient->fNumSetupsDone > 0 && sess != NULL) {
430 rtspClient->sendGetParameterCommand(*sess, ::continueAfterGET_PARAMETER, "", rtspClient->auth());
431 } else {
432#endif
433 rtspClient->sendOptionsCommand(::continueAfterOPTIONS, rtspClient->auth());
434#ifdef SEND_GET_PARAMETER_IF_SUPPORTED
435 }
436#endif
437}
438
439void ProxyRTSPClient::scheduleReset() {
440 if (fVerbosityLevel > 0) {
441 envir() << "ProxyRTSPClient::scheduleReset\n";
442 }
443 envir().taskScheduler().rescheduleDelayedTask(fResetTask, 0, doReset, this);
444}
445
446void ProxyRTSPClient::doReset() {
447 if (fVerbosityLevel > 0) {
448 envir() << *this << "::doReset\n";
449 }
450
451 reset();
452 fOurServerMediaSession.resetDESCRIBEState();
453
454 setBaseURL(fOurURL); // because we'll be sending an initial "DESCRIBE" all over again
455 sendDESCRIBE(this);
456}
457
458void ProxyRTSPClient::doReset(void* clientData) {
459 ProxyRTSPClient* rtspClient = (ProxyRTSPClient*)clientData;
460 rtspClient->doReset();
461}
462
463void ProxyRTSPClient::scheduleDESCRIBECommand() {
464 // Delay 1s, 2s, 4s, 8s ... 256s until sending the next "DESCRIBE". Then, keep delaying a random time from [256..511] seconds:
465 unsigned secondsToDelay;
466 if (fNextDESCRIBEDelay <= 256) {
467 secondsToDelay = fNextDESCRIBEDelay;
468 fNextDESCRIBEDelay *= 2;
469 } else {
470 secondsToDelay = 256 + (our_random()&0xFF); // [256..511] seconds
471 }
472
473 if (fVerbosityLevel > 0) {
474 envir() << *this << ": RTSP \"DESCRIBE\" command failed; trying again in " << secondsToDelay << " seconds\n";
475 }
476 fDESCRIBECommandTask = envir().taskScheduler().scheduleDelayedTask(secondsToDelay*MILLION, sendDESCRIBE, this);
477}
478
479void ProxyRTSPClient::sendDESCRIBE(void* clientData) {
480 ProxyRTSPClient* rtspClient = (ProxyRTSPClient*)clientData;
481 if (rtspClient != NULL) rtspClient->sendDescribeCommand(::continueAfterDESCRIBE, rtspClient->auth());
482}
483
484void ProxyRTSPClient::subsessionTimeout(void* clientData) {
485 ((ProxyRTSPClient*)clientData)->handleSubsessionTimeout();
486}
487
488void ProxyRTSPClient::handleSubsessionTimeout() {
489 // We still have one or more subsessions ('tracks') left to "SETUP". But we can't wait any longer for them. Send a "PLAY" now:
490 MediaSession* sess = fOurServerMediaSession.fClientMediaSession;
491 if (sess != NULL) sendPlayCommand(*sess, ::continueAfterPLAY, -1.0f, -1.0f, 1.0f, fOurAuthenticator);
492 fLastCommandWasPLAY = True;
493}
494
495
496//////// "ProxyServerMediaSubsession" implementation //////////
497
498ProxyServerMediaSubsession
499::ProxyServerMediaSubsession(MediaSubsession& mediaSubsession,
500 portNumBits initialPortNum, Boolean multiplexRTCPWithRTP)
501 : OnDemandServerMediaSubsession(mediaSubsession.parentSession().envir(), True/*reuseFirstSource*/,
502 initialPortNum, multiplexRTCPWithRTP),
503 fClientMediaSubsession(mediaSubsession), fCodecName(strDup(mediaSubsession.codecName())),
504 fNext(NULL), fHaveSetupStream(False) {
505}
506
507UsageEnvironment& operator<<(UsageEnvironment& env, const ProxyServerMediaSubsession& psmss) { // used for debugging
508 return env << "ProxyServerMediaSubsession[" << psmss.url() << "," << psmss.codecName() << "]";
509}
510
511ProxyServerMediaSubsession::~ProxyServerMediaSubsession() {
512 if (verbosityLevel() > 0) {
513 envir() << *this << "::~ProxyServerMediaSubsession()\n";
514 }
515
516 delete[] (char*)fCodecName;
517}
518
519FramedSource* ProxyServerMediaSubsession::createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate) {
520 ProxyServerMediaSession* const sms = (ProxyServerMediaSession*)fParentSession;
521
522 if (verbosityLevel() > 0) {
523 envir() << *this << "::createNewStreamSource(session id " << clientSessionId << ")\n";
524 }
525
526 // If we haven't yet created a data source from our 'media subsession' object, initiate() it to do so:
527 if (fClientMediaSubsession.readSource() == NULL) {
528 if (sms->fTranscodingTable == NULL || !sms->fTranscodingTable->weWillTranscode("audio", "MPA-ROBUST")) fClientMediaSubsession.receiveRawMP3ADUs(); // hack for proxying MPA-ROBUST streams
529 if (sms->fTranscodingTable == NULL || !sms->fTranscodingTable->weWillTranscode("video", "JPEG")) fClientMediaSubsession.receiveRawJPEGFrames(); // hack for proxying JPEG/RTP streams.
530 fClientMediaSubsession.initiate();
531 if (verbosityLevel() > 0) {
532 envir() << "\tInitiated: " << *this << "\n";
533 }
534
535 if (fClientMediaSubsession.readSource() != NULL) {
536 // First, check whether we have defined a 'transcoder' filter to be used with this codec:
537 if (sms->fTranscodingTable != NULL) {
538 char* outputCodecName;
539 FramedFilter* transcoder
540 = sms->fTranscodingTable->lookupTranscoder(fClientMediaSubsession, outputCodecName);
541 if (transcoder != NULL) {
542 fClientMediaSubsession.addFilter(transcoder);
543 delete[] (char*)fCodecName; fCodecName = outputCodecName;
544 }
545 }
546
547 // Then, add to the front of all data sources a filter that will 'normalize' their frames'
548 // presentation times, before the frames get re-transmitted by our server:
549 FramedFilter* normalizerFilter = sms->fPresentationTimeSessionNormalizer
550 ->createNewPresentationTimeSubsessionNormalizer(fClientMediaSubsession.readSource(),
551 fClientMediaSubsession.rtpSource(),
552 fCodecName);
553 fClientMediaSubsession.addFilter(normalizerFilter);
554
555 // Some data sources require a 'framer' object to be added, before they can be fed into
556 // a "RTPSink". Adjust for this now:
557 if (strcmp(fCodecName, "H264") == 0) {
558 fClientMediaSubsession.addFilter(H264VideoStreamDiscreteFramer
559 ::createNew(envir(), fClientMediaSubsession.readSource()));
560 } else if (strcmp(fCodecName, "H265") == 0) {
561 fClientMediaSubsession.addFilter(H265VideoStreamDiscreteFramer
562 ::createNew(envir(), fClientMediaSubsession.readSource()));
563 } else if (strcmp(fCodecName, "MP4V-ES") == 0) {
564 fClientMediaSubsession.addFilter(MPEG4VideoStreamDiscreteFramer
565 ::createNew(envir(), fClientMediaSubsession.readSource(),
566 True/* leave PTs unmodified*/));
567 } else if (strcmp(fCodecName, "MPV") == 0) {
568 fClientMediaSubsession.addFilter(MPEG1or2VideoStreamDiscreteFramer
569 ::createNew(envir(), fClientMediaSubsession.readSource(),
570 False, 5.0, True/* leave PTs unmodified*/));
571 } else if (strcmp(fCodecName, "DV") == 0) {
572 fClientMediaSubsession.addFilter(DVVideoStreamFramer
573 ::createNew(envir(), fClientMediaSubsession.readSource(),
574 False, True/* leave PTs unmodified*/));
575 }
576 }
577
578 if (fClientMediaSubsession.rtcpInstance() != NULL) {
579 fClientMediaSubsession.rtcpInstance()->setByeHandler(subsessionByeHandler, this);
580 }
581 }
582
583 ProxyRTSPClient* const proxyRTSPClient = sms->fProxyRTSPClient;
584 if (clientSessionId != 0) {
585 // We're being called as a result of implementing a RTSP "SETUP".
586 if (!fHaveSetupStream) {
587 // This is our first "SETUP". Send RTSP "SETUP" and later "PLAY" commands to the proxied server, to start streaming:
588 // (Before sending "SETUP", enqueue ourselves on the "RTSPClient"s 'SETUP queue', so we'll be able to get the correct
589 // "ProxyServerMediaSubsession" to handle the response. (Note that responses come back in the same order as requests.))
590 Boolean queueWasEmpty = proxyRTSPClient->fSetupQueueHead == NULL;
591 if (queueWasEmpty) {
592 proxyRTSPClient->fSetupQueueHead = this;
593 proxyRTSPClient->fSetupQueueTail = this;
594 } else {
595 // Add ourself to the "RTSPClient"s 'SETUP queue' (if we're not already on it):
596 ProxyServerMediaSubsession* psms;
597 for (psms = proxyRTSPClient->fSetupQueueHead; psms != NULL; psms = psms->fNext) {
598 if (psms == this) break;
599 }
600 if (psms == NULL) {
601 proxyRTSPClient->fSetupQueueTail->fNext = this;
602 proxyRTSPClient->fSetupQueueTail = this;
603 }
604 }
605
606 // Hack: If there's already a pending "SETUP" request, don't send this track's "SETUP" right away, because
607 // the server might not properly handle 'pipelined' requests. Instead, wait until after previous "SETUP" responses come back.
608 if (queueWasEmpty) {
609 proxyRTSPClient->sendSetupCommand(fClientMediaSubsession, ::continueAfterSETUP,
610 False, proxyRTSPClient->fStreamRTPOverTCP, False, proxyRTSPClient->auth());
611 ++proxyRTSPClient->fNumSetupsDone;
612 fHaveSetupStream = True;
613 }
614 } else {
615 // This is a "SETUP" from a new client. We know that there are no other currently active clients (otherwise we wouldn't
616 // have been called here), so we know that the substream was previously "PAUSE"d. Send "PLAY" downstream once again,
617 // to resume the stream:
618 if (!proxyRTSPClient->fLastCommandWasPLAY) { // so that we send only one "PLAY"; not one for each subsession
619 proxyRTSPClient->sendPlayCommand(fClientMediaSubsession.parentSession(), ::continueAfterPLAY, -1.0f/*resume from previous point*/,
620 -1.0f, 1.0f, proxyRTSPClient->auth());
621 proxyRTSPClient->fLastCommandWasPLAY = True;
622 }
623 }
624 }
625
626 estBitrate = fClientMediaSubsession.bandwidth();
627 if (estBitrate == 0) estBitrate = 50; // kbps, estimate
628 return fClientMediaSubsession.readSource();
629}
630
631void ProxyServerMediaSubsession::closeStreamSource(FramedSource* inputSource) {
632 if (verbosityLevel() > 0) {
633 envir() << *this << "::closeStreamSource()\n";
634 }
635 // Because there's only one input source for this 'subsession' (regardless of how many downstream clients are proxying it),
636 // we don't close the input source here. (Instead, we wait until *this* object gets deleted.)
637 // However, because (as evidenced by this function having been called) we no longer have any clients accessing the stream,
638 // then we "PAUSE" the downstream proxied stream, until a new client arrives:
639 if (fHaveSetupStream) {
640 ProxyServerMediaSession* const sms = (ProxyServerMediaSession*)fParentSession;
641 ProxyRTSPClient* const proxyRTSPClient = sms->fProxyRTSPClient;
642 if (proxyRTSPClient->fLastCommandWasPLAY) { // so that we send only one "PAUSE"; not one for each subsession
643 if (fParentSession->referenceCount() > 1) {
644 // There are other client(s) still streaming other subsessions of this stream.
645 // Therefore, we don't send a "PAUSE" for the whole stream, but only for the sub-stream:
646 proxyRTSPClient->sendPauseCommand(fClientMediaSubsession, NULL, proxyRTSPClient->auth());
647 } else {
648 // Normal case: There are no other client still streaming (parts of) this stream.
649 // Send a "PAUSE" for the whole stream.
650 proxyRTSPClient->sendPauseCommand(fClientMediaSubsession.parentSession(), NULL, proxyRTSPClient->auth());
651 proxyRTSPClient->fLastCommandWasPLAY = False;
652 }
653 }
654 }
655}
656
657RTPSink* ProxyServerMediaSubsession
658::createNewRTPSink(Groupsock* rtpGroupsock, unsigned char rtpPayloadTypeIfDynamic, FramedSource* inputSource) {
659 if (verbosityLevel() > 0) {
660 envir() << *this << "::createNewRTPSink()\n";
661 }
662
663 // Create (and return) the appropriate "RTPSink" object for our codec:
664 // (Note: The configuration string might not be correct if a transcoder is used. FIX!) #####
665 RTPSink* newSink;
666 if (strcmp(fCodecName, "AC3") == 0 || strcmp(fCodecName, "EAC3") == 0) {
667 newSink = AC3AudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
668 fClientMediaSubsession.rtpTimestampFrequency());
669#if 0 // This code does not work; do *not* enable it:
670 } else if (strcmp(fCodecName, "AMR") == 0 || strcmp(fCodecName, "AMR-WB") == 0) {
671 Boolean isWideband = strcmp(fCodecName, "AMR-WB") == 0;
672 newSink = AMRAudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
673 isWideband, fClientMediaSubsession.numChannels());
674#endif
675 } else if (strcmp(fCodecName, "DV") == 0) {
676 newSink = DVVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic);
677 } else if (strcmp(fCodecName, "GSM") == 0) {
678 newSink = GSMAudioRTPSink::createNew(envir(), rtpGroupsock);
679 } else if (strcmp(fCodecName, "H263-1998") == 0 || strcmp(fCodecName, "H263-2000") == 0) {
680 newSink = H263plusVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
681 fClientMediaSubsession.rtpTimestampFrequency());
682 } else if (strcmp(fCodecName, "H264") == 0) {
683 newSink = H264VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
684 fClientMediaSubsession.fmtp_spropparametersets());
685 } else if (strcmp(fCodecName, "H265") == 0) {
686 newSink = H265VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
687 fClientMediaSubsession.fmtp_spropvps(),
688 fClientMediaSubsession.fmtp_spropsps(),
689 fClientMediaSubsession.fmtp_sproppps());
690 } else if (strcmp(fCodecName, "JPEG") == 0) {
691 newSink = SimpleRTPSink::createNew(envir(), rtpGroupsock, 26, 90000, "video", "JPEG",
692 1/*numChannels*/, False/*allowMultipleFramesPerPacket*/, False/*doNormalMBitRule*/);
693 } else if (strcmp(fCodecName, "MP4A-LATM") == 0) {
694 newSink = MPEG4LATMAudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
695 fClientMediaSubsession.rtpTimestampFrequency(),
696 fClientMediaSubsession.fmtp_config(),
697 fClientMediaSubsession.numChannels());
698 } else if (strcmp(fCodecName, "MP4V-ES") == 0) {
699 newSink = MPEG4ESVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
700 fClientMediaSubsession.rtpTimestampFrequency(),
701 fClientMediaSubsession.attrVal_unsigned("profile-level-id"),
702 fClientMediaSubsession.fmtp_config());
703 } else if (strcmp(fCodecName, "MPA") == 0) {
704 newSink = MPEG1or2AudioRTPSink::createNew(envir(), rtpGroupsock);
705 } else if (strcmp(fCodecName, "MPA-ROBUST") == 0) {
706 newSink = MP3ADURTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic);
707 } else if (strcmp(fCodecName, "MPEG4-GENERIC") == 0) {
708 newSink = MPEG4GenericRTPSink::createNew(envir(), rtpGroupsock,
709 rtpPayloadTypeIfDynamic, fClientMediaSubsession.rtpTimestampFrequency(),
710 fClientMediaSubsession.mediumName(),
711 fClientMediaSubsession.attrVal_str("mode"),
712 fClientMediaSubsession.fmtp_config(), fClientMediaSubsession.numChannels());
713 } else if (strcmp(fCodecName, "MPV") == 0) {
714 newSink = MPEG1or2VideoRTPSink::createNew(envir(), rtpGroupsock);
715 } else if (strcmp(fCodecName, "OPUS") == 0) {
716 newSink = SimpleRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
717 48000, "audio", "OPUS", 2, False/*only 1 Opus 'packet' in each RTP packet*/);
718 } else if (strcmp(fCodecName, "T140") == 0) {
719 newSink = T140TextRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic);
720 } else if (strcmp(fCodecName, "THEORA") == 0) {
721 newSink = TheoraVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
722 fClientMediaSubsession.fmtp_config());
723 } else if (strcmp(fCodecName, "VORBIS") == 0) {
724 newSink = VorbisAudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
725 fClientMediaSubsession.rtpTimestampFrequency(), fClientMediaSubsession.numChannels(),
726 fClientMediaSubsession.fmtp_config());
727 } else if (strcmp(fCodecName, "VP8") == 0) {
728 newSink = VP8VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic);
729 } else if (strcmp(fCodecName, "VP9") == 0) {
730 newSink = VP9VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic);
731 } else if (strcmp(fCodecName, "AMR") == 0 || strcmp(fCodecName, "AMR-WB") == 0) {
732 // Proxying of these codecs is currently *not* supported, because the data received by the "RTPSource" object is not in a
733 // form that can be fed directly into a corresponding "RTPSink" object.
734 if (verbosityLevel() > 0) {
735 envir() << "\treturns NULL (because we currently don't support the proxying of \""
736 << fClientMediaSubsession.mediumName() << "/" << fCodecName << "\" streams)\n";
737 }
738 return NULL;
739 } else if (strcmp(fCodecName, "QCELP") == 0 ||
740 strcmp(fCodecName, "H261") == 0 ||
741 strcmp(fCodecName, "H263-1998") == 0 || strcmp(fCodecName, "H263-2000") == 0 ||
742 strcmp(fCodecName, "X-QT") == 0 || strcmp(fCodecName, "X-QUICKTIME") == 0) {
743 // This codec requires a specialized RTP payload format; however, we don't yet have an appropriate "RTPSink" subclass for it:
744 if (verbosityLevel() > 0) {
745 envir() << "\treturns NULL (because we don't have a \"RTPSink\" subclass for this RTP payload format)\n";
746 }
747 return NULL;
748 } else {
749 // This codec is assumed to have a simple RTP payload format that can be implemented just with a "SimpleRTPSink":
750 Boolean allowMultipleFramesPerPacket = True; // by default
751 Boolean doNormalMBitRule = True; // by default
752 // Some codecs change the above default parameters:
753 if (strcmp(fCodecName, "MP2T") == 0) {
754 doNormalMBitRule = False; // no RTP 'M' bit
755 }
756 newSink = SimpleRTPSink::createNew(envir(), rtpGroupsock,
757 rtpPayloadTypeIfDynamic, fClientMediaSubsession.rtpTimestampFrequency(),
758 fClientMediaSubsession.mediumName(), fCodecName,
759 fClientMediaSubsession.numChannels(), allowMultipleFramesPerPacket, doNormalMBitRule);
760 }
761
762 // Because our relayed frames' presentation times are inaccurate until the input frames have been RTCP-synchronized,
763 // we temporarily disable RTCP "SR" reports for this "RTPSink" object:
764 newSink->enableRTCPReports() = False;
765
766 // Also tell our "PresentationTimeSubsessionNormalizer" object about the "RTPSink", so it can enable RTCP "SR" reports later:
767 PresentationTimeSubsessionNormalizer* ssNormalizer;
768 if (strcmp(fCodecName, "H264") == 0 ||
769 strcmp(fCodecName, "H265") == 0 ||
770 strcmp(fCodecName, "MP4V-ES") == 0 ||
771 strcmp(fCodecName, "MPV") == 0 ||
772 strcmp(fCodecName, "DV") == 0) {
773 // There was a separate 'framer' object in front of the "PresentationTimeSubsessionNormalizer", so go back one object to get it:
774 ssNormalizer = (PresentationTimeSubsessionNormalizer*)(((FramedFilter*)inputSource)->inputSource());
775 } else {
776 ssNormalizer = (PresentationTimeSubsessionNormalizer*)inputSource;
777 }
778 ssNormalizer->setRTPSink(newSink);
779
780 return newSink;
781}
782
783Groupsock* ProxyServerMediaSubsession::createGroupsock(struct in_addr const& addr, Port port) {
784 ProxyServerMediaSession* parentSession = (ProxyServerMediaSession*)fParentSession;
785 return parentSession->createGroupsock(addr, port);
786}
787
788RTCPInstance* ProxyServerMediaSubsession
789::createRTCP(Groupsock* RTCPgs, unsigned totSessionBW, /* in kbps */
790 unsigned char const* cname, RTPSink* sink) {
791 ProxyServerMediaSession* parentSession = (ProxyServerMediaSession*)fParentSession;
792 return parentSession->createRTCP(RTCPgs, totSessionBW, cname, sink);
793}
794
795void ProxyServerMediaSubsession::subsessionByeHandler(void* clientData) {
796 ((ProxyServerMediaSubsession*)clientData)->subsessionByeHandler();
797}
798
799void ProxyServerMediaSubsession::subsessionByeHandler() {
800 if (verbosityLevel() > 0) {
801 envir() << *this << ": received RTCP \"BYE\". (The back-end stream has ended.)\n";
802 }
803
804 // This "BYE" signals that our input source has (effectively) closed, so pass this onto the front-end clients:
805 fHaveSetupStream = False; // hack to stop "PAUSE" getting sent by:
806 if (fClientMediaSubsession.readSource() != NULL) {
807 fClientMediaSubsession.readSource()->handleClosure();
808 }
809
810 // And then treat this as if we had lost connection to the back-end server,
811 // and can reestablish streaming from it only by sending another "DESCRIBE":
812 ProxyServerMediaSession* const sms = (ProxyServerMediaSession*)fParentSession;
813 ProxyRTSPClient* const proxyRTSPClient = sms->fProxyRTSPClient;
814 proxyRTSPClient->scheduleReset();
815}
816
817
818////////// PresentationTimeSessionNormalizer and PresentationTimeSubsessionNormalizer implementations //////////
819
820// PresentationTimeSessionNormalizer:
821
822PresentationTimeSessionNormalizer::PresentationTimeSessionNormalizer(UsageEnvironment& env)
823 : Medium(env),
824 fSubsessionNormalizers(NULL), fMasterSSNormalizer(NULL) {
825}
826
827PresentationTimeSessionNormalizer::~PresentationTimeSessionNormalizer() {
828 while (fSubsessionNormalizers != NULL) {
829 Medium::close(fSubsessionNormalizers);
830 }
831}
832
833PresentationTimeSubsessionNormalizer* PresentationTimeSessionNormalizer
834::createNewPresentationTimeSubsessionNormalizer(FramedSource* inputSource, RTPSource* rtpSource,
835 char const* codecName) {
836 fSubsessionNormalizers
837 = new PresentationTimeSubsessionNormalizer(*this, inputSource, rtpSource, codecName, fSubsessionNormalizers);
838 return fSubsessionNormalizers;
839}
840
841void PresentationTimeSessionNormalizer
842::normalizePresentationTime(PresentationTimeSubsessionNormalizer* ssNormalizer,
843 struct timeval& toPT, struct timeval const& fromPT) {
844 Boolean const hasBeenSynced = ssNormalizer->fRTPSource->hasBeenSynchronizedUsingRTCP();
845
846 if (!hasBeenSynced) {
847 // If "fromPT" has not yet been RTCP-synchronized, then it was generated by our own receiving code, and thus
848 // is already aligned with 'wall-clock' time. Just copy it 'as is' to "toPT":
849 toPT = fromPT;
850 } else {
851 if (fMasterSSNormalizer == NULL) {
852 // Make "ssNormalizer" the 'master' subsession - meaning that its presentation time is adjusted to align with 'wall clock'
853 // time, and the presentation times of other subsessions (if any) are adjusted to retain their relative separation with
854 // those of the master:
855 fMasterSSNormalizer = ssNormalizer;
856
857 struct timeval timeNow;
858 gettimeofday(&timeNow, NULL);
859
860 // Compute: fPTAdjustment = timeNow - fromPT
861 fPTAdjustment.tv_sec = timeNow.tv_sec - fromPT.tv_sec;
862 fPTAdjustment.tv_usec = timeNow.tv_usec - fromPT.tv_usec;
863 // Note: It's OK if one or both of these fields underflows; the result still works out OK later.
864 }
865
866 // Compute a normalized presentation time: toPT = fromPT + fPTAdjustment
867 toPT.tv_sec = fromPT.tv_sec + fPTAdjustment.tv_sec - 1;
868 toPT.tv_usec = fromPT.tv_usec + fPTAdjustment.tv_usec + MILLION;
869 while (toPT.tv_usec > MILLION) { ++toPT.tv_sec; toPT.tv_usec -= MILLION; }
870
871 // Because "ssNormalizer"s relayed presentation times are accurate from now on, enable RTCP "SR" reports for its "RTPSink":
872 RTPSink* const rtpSink = ssNormalizer->fRTPSink;
873 if (rtpSink != NULL) { // sanity check; should always be true
874 rtpSink->enableRTCPReports() = True;
875 }
876 }
877}
878
879void PresentationTimeSessionNormalizer
880::removePresentationTimeSubsessionNormalizer(PresentationTimeSubsessionNormalizer* ssNormalizer) {
881 // Unlink "ssNormalizer" from the linked list (starting with "fSubsessionNormalizers"):
882 if (fSubsessionNormalizers == ssNormalizer) {
883 fSubsessionNormalizers = fSubsessionNormalizers->fNext;
884 } else {
885 PresentationTimeSubsessionNormalizer** ssPtrPtr = &(fSubsessionNormalizers->fNext);
886 while (*ssPtrPtr != ssNormalizer) ssPtrPtr = &((*ssPtrPtr)->fNext);
887 *ssPtrPtr = (*ssPtrPtr)->fNext;
888 }
889}
890
891// PresentationTimeSubsessionNormalizer:
892
893PresentationTimeSubsessionNormalizer
894::PresentationTimeSubsessionNormalizer(PresentationTimeSessionNormalizer& parent, FramedSource* inputSource, RTPSource* rtpSource,
895 char const* codecName, PresentationTimeSubsessionNormalizer* next)
896 : FramedFilter(parent.envir(), inputSource),
897 fParent(parent), fRTPSource(rtpSource), fRTPSink(NULL), fCodecName(codecName), fNext(next) {
898}
899
900PresentationTimeSubsessionNormalizer::~PresentationTimeSubsessionNormalizer() {
901 fParent.removePresentationTimeSubsessionNormalizer(this);
902}
903
904void PresentationTimeSubsessionNormalizer::afterGettingFrame(void* clientData, unsigned frameSize,
905 unsigned numTruncatedBytes,
906 struct timeval presentationTime,
907 unsigned durationInMicroseconds) {
908 ((PresentationTimeSubsessionNormalizer*)clientData)
909 ->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime, durationInMicroseconds);
910}
911
912void PresentationTimeSubsessionNormalizer::afterGettingFrame(unsigned frameSize,
913 unsigned numTruncatedBytes,
914 struct timeval presentationTime,
915 unsigned durationInMicroseconds) {
916 // This filter is implemented by passing all frames through unchanged, except that "fPresentationTime" is changed:
917 fFrameSize = frameSize;
918 fNumTruncatedBytes = numTruncatedBytes;
919 fDurationInMicroseconds = durationInMicroseconds;
920
921 fParent.normalizePresentationTime(this, fPresentationTime, presentationTime);
922
923 // Hack for JPEG/RTP proxying. Because we're proxying JPEG by just copying the raw JPEG/RTP payloads, without interpreting them,
924 // we need to also 'copy' the RTP 'M' (marker) bit from the "RTPSource" to the "RTPSink":
925 if (fRTPSource->curPacketMarkerBit() && strcmp(fCodecName, "JPEG") == 0) ((SimpleRTPSink*)fRTPSink)->setMBitOnNextPacket();
926
927 // Complete delivery:
928 FramedSource::afterGetting(this);
929}
930
931void PresentationTimeSubsessionNormalizer::doGetNextFrame() {
932 fInputSource->getNextFrame(fTo, fMaxSize, afterGettingFrame, this, FramedSource::handleClosure, this);
933}
934