1 | /********** |
2 | This library is free software; you can redistribute it and/or modify it under |
3 | the terms of the GNU Lesser General Public License as published by the |
4 | Free Software Foundation; either version 3 of the License, or (at your |
5 | option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.) |
6 | |
7 | This library is distributed in the hope that it will be useful, but WITHOUT |
8 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
9 | FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for |
10 | more details. |
11 | |
12 | You should have received a copy of the GNU Lesser General Public License |
13 | along with this library; if not, write to the Free Software Foundation, Inc., |
14 | 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
15 | **********/ |
16 | // "liveMedia" |
17 | // Copyright (c) 1996-2020 Live Networks, Inc. All rights reserved. |
18 | // A subclass of "ServerMediaSession" that can be used to create a (unicast) RTSP servers that acts as a 'proxy' for |
19 | // another (unicast or multicast) RTSP/RTP stream. |
20 | // Implementation |
21 | |
22 | #include "liveMedia.hh" |
23 | #include "RTSPCommon.hh" |
24 | #include "GroupsockHelper.hh" // for "our_random()" |
25 | |
26 | #ifndef MILLION |
27 | #define MILLION 1000000 |
28 | #endif |
29 | |
30 | // A "OnDemandServerMediaSubsession" subclass, used to implement a unicast RTSP server that's proxying another RTSP stream: |
31 | |
32 | class ProxyServerMediaSubsession: public OnDemandServerMediaSubsession { |
33 | public: |
34 | ProxyServerMediaSubsession(MediaSubsession& mediaSubsession, |
35 | portNumBits initialPortNum, Boolean multiplexRTCPWithRTP); |
36 | virtual ~ProxyServerMediaSubsession(); |
37 | |
38 | char const* codecName() const { return fCodecName; } |
39 | char const* url() const { return ((ProxyServerMediaSession*)fParentSession)->url(); } |
40 | |
41 | private: // redefined virtual functions |
42 | virtual FramedSource* createNewStreamSource(unsigned clientSessionId, |
43 | unsigned& estBitrate); |
44 | virtual void closeStreamSource(FramedSource *inputSource); |
45 | virtual RTPSink* createNewRTPSink(Groupsock* rtpGroupsock, |
46 | unsigned char rtpPayloadTypeIfDynamic, |
47 | FramedSource* inputSource); |
48 | virtual Groupsock* createGroupsock(struct in_addr const& addr, Port port); |
49 | virtual RTCPInstance* createRTCP(Groupsock* RTCPgs, unsigned totSessionBW, /* in kbps */ |
50 | unsigned char const* cname, RTPSink* sink); |
51 | |
52 | private: |
53 | static void subsessionByeHandler(void* clientData); |
54 | void subsessionByeHandler(); |
55 | |
56 | int verbosityLevel() const { return ((ProxyServerMediaSession*)fParentSession)->fVerbosityLevel; } |
57 | |
58 | private: |
59 | friend class ProxyRTSPClient; |
60 | MediaSubsession& fClientMediaSubsession; // the 'client' media subsession object that corresponds to this 'server' media subsession |
61 | char const* fCodecName; // copied from "fClientMediaSubsession" once it's been set up |
62 | ProxyServerMediaSubsession* fNext; // used when we're part of a queue |
63 | Boolean fHaveSetupStream; |
64 | }; |
65 | |
66 | |
67 | ////////// ProxyServerMediaSession implementation ////////// |
68 | |
69 | UsageEnvironment& operator<<(UsageEnvironment& env, const ProxyServerMediaSession& psms) { // used for debugging |
70 | return env << "ProxyServerMediaSession[" << psms.url() << "]" ; |
71 | } |
72 | |
73 | ProxyRTSPClient* |
74 | defaultCreateNewProxyRTSPClientFunc(ProxyServerMediaSession& ourServerMediaSession, |
75 | char const* rtspURL, |
76 | char const* username, char const* password, |
77 | portNumBits tunnelOverHTTPPortNum, int verbosityLevel, |
78 | int socketNumToServer) { |
79 | return new ProxyRTSPClient(ourServerMediaSession, rtspURL, username, password, |
80 | tunnelOverHTTPPortNum, verbosityLevel, socketNumToServer); |
81 | } |
82 | |
83 | ProxyServerMediaSession* ProxyServerMediaSession |
84 | ::createNew(UsageEnvironment& env, GenericMediaServer* ourMediaServer, |
85 | char const* inputStreamURL, char const* streamName, |
86 | char const* username, char const* password, |
87 | portNumBits tunnelOverHTTPPortNum, int verbosityLevel, int socketNumToServer, |
88 | MediaTranscodingTable* transcodingTable) { |
89 | return new ProxyServerMediaSession(env, ourMediaServer, inputStreamURL, streamName, username, password, |
90 | tunnelOverHTTPPortNum, verbosityLevel, socketNumToServer, |
91 | transcodingTable); |
92 | } |
93 | |
94 | |
95 | ProxyServerMediaSession |
96 | ::ProxyServerMediaSession(UsageEnvironment& env, GenericMediaServer* ourMediaServer, |
97 | char const* inputStreamURL, char const* streamName, |
98 | char const* username, char const* password, |
99 | portNumBits tunnelOverHTTPPortNum, int verbosityLevel, |
100 | int socketNumToServer, |
101 | MediaTranscodingTable* transcodingTable, |
102 | createNewProxyRTSPClientFunc* ourCreateNewProxyRTSPClientFunc, |
103 | portNumBits initialPortNum, Boolean multiplexRTCPWithRTP) |
104 | : ServerMediaSession(env, streamName, NULL, NULL, False, NULL), |
105 | describeCompletedFlag(0), fOurMediaServer(ourMediaServer), fClientMediaSession(NULL), |
106 | fVerbosityLevel(verbosityLevel), |
107 | fPresentationTimeSessionNormalizer(new PresentationTimeSessionNormalizer(envir())), |
108 | fCreateNewProxyRTSPClientFunc(ourCreateNewProxyRTSPClientFunc), |
109 | fTranscodingTable(transcodingTable), |
110 | fInitialPortNum(initialPortNum), fMultiplexRTCPWithRTP(multiplexRTCPWithRTP) { |
111 | // Open a RTSP connection to the input stream, and send a "DESCRIBE" command. |
112 | // We'll use the SDP description in the response to set ourselves up. |
113 | fProxyRTSPClient |
114 | = (*fCreateNewProxyRTSPClientFunc)(*this, inputStreamURL, username, password, |
115 | tunnelOverHTTPPortNum, |
116 | verbosityLevel > 0 ? verbosityLevel-1 : verbosityLevel, |
117 | socketNumToServer); |
118 | ProxyRTSPClient::sendDESCRIBE(fProxyRTSPClient); |
119 | } |
120 | |
121 | ProxyServerMediaSession::~ProxyServerMediaSession() { |
122 | if (fVerbosityLevel > 0) { |
123 | envir() << *this << "::~ProxyServerMediaSession()\n" ; |
124 | } |
125 | |
126 | // Begin by sending a "TEARDOWN" command (without checking for a response): |
127 | if (fProxyRTSPClient != NULL && fClientMediaSession != NULL) { |
128 | fProxyRTSPClient->sendTeardownCommand(*fClientMediaSession, NULL, fProxyRTSPClient->auth()); |
129 | } |
130 | |
131 | // Then delete our state: |
132 | Medium::close(fClientMediaSession); |
133 | Medium::close(fProxyRTSPClient); |
134 | Medium::close(fPresentationTimeSessionNormalizer); |
135 | } |
136 | |
137 | char const* ProxyServerMediaSession::url() const { |
138 | return fProxyRTSPClient == NULL ? NULL : fProxyRTSPClient->url(); |
139 | } |
140 | |
141 | Groupsock* ProxyServerMediaSession::createGroupsock(struct in_addr const& addr, Port port) { |
142 | // Default implementation; may be redefined by subclasses: |
143 | return new Groupsock(envir(), addr, port, 255); |
144 | } |
145 | |
146 | RTCPInstance* ProxyServerMediaSession |
147 | ::createRTCP(Groupsock* RTCPgs, unsigned totSessionBW, /* in kbps */ |
148 | unsigned char const* cname, RTPSink* sink) { |
149 | // Default implementation; may be redefined by subclasses: |
150 | return RTCPInstance::createNew(envir(), RTCPgs, totSessionBW, cname, sink, NULL/*we're a server*/); |
151 | } |
152 | |
153 | Boolean ProxyServerMediaSession::allowProxyingForSubsession(MediaSubsession const& /*mss*/) { |
154 | // Default implementation |
155 | return True; |
156 | } |
157 | |
158 | void ProxyServerMediaSession::continueAfterDESCRIBE(char const* sdpDescription) { |
159 | describeCompletedFlag = 1; |
160 | |
161 | // Create a (client) "MediaSession" object from the stream's SDP description ("resultString"), then iterate through its |
162 | // "MediaSubsession" objects, to set up corresponding "ServerMediaSubsession" objects that we'll use to serve the stream's tracks. |
163 | do { |
164 | fClientMediaSession = MediaSession::createNew(envir(), sdpDescription); |
165 | if (fClientMediaSession == NULL) break; |
166 | |
167 | MediaSubsessionIterator iter(*fClientMediaSession); |
168 | for (MediaSubsession* mss = iter.next(); mss != NULL; mss = iter.next()) { |
169 | if (!allowProxyingForSubsession(*mss)) continue; |
170 | |
171 | ServerMediaSubsession* smss |
172 | = new ProxyServerMediaSubsession(*mss, fInitialPortNum, fMultiplexRTCPWithRTP); |
173 | addSubsession(smss); |
174 | if (fVerbosityLevel > 0) { |
175 | envir() << *this << " added new \"ProxyServerMediaSubsession\" for " |
176 | << mss->protocolName() << "/" << mss->mediumName() << "/" << mss->codecName() << " track\n" ; |
177 | } |
178 | } |
179 | } while (0); |
180 | } |
181 | |
182 | void ProxyServerMediaSession::resetDESCRIBEState() { |
183 | // Delete all of our "ProxyServerMediaSubsession"s; they'll get set up again once we get a response to the new "DESCRIBE". |
184 | if (fOurMediaServer != NULL) { |
185 | // First, close any client connections that may have already been set up: |
186 | fOurMediaServer->closeAllClientSessionsForServerMediaSession(this); |
187 | } |
188 | deleteAllSubsessions(); |
189 | |
190 | // Finally, delete the client "MediaSession" object that we had set up after receiving the response to the previous "DESCRIBE": |
191 | Medium::close(fClientMediaSession); fClientMediaSession = NULL; |
192 | } |
193 | |
194 | ///////// RTSP 'response handlers' ////////// |
195 | |
196 | static void continueAfterDESCRIBE(RTSPClient* rtspClient, int resultCode, char* resultString) { |
197 | char const* res; |
198 | |
199 | if (resultCode == 0) { |
200 | // The "DESCRIBE" command succeeded, so "resultString" should be the stream's SDP description. |
201 | res = resultString; |
202 | } else { |
203 | // The "DESCRIBE" command failed. |
204 | res = NULL; |
205 | } |
206 | ((ProxyRTSPClient*)rtspClient)->continueAfterDESCRIBE(res); |
207 | delete[] resultString; |
208 | } |
209 | |
210 | static void continueAfterSETUP(RTSPClient* rtspClient, int resultCode, char* resultString) { |
211 | ((ProxyRTSPClient*)rtspClient)->continueAfterSETUP(resultCode); |
212 | delete[] resultString; |
213 | } |
214 | |
215 | static void continueAfterPLAY(RTSPClient* rtspClient, int resultCode, char* resultString) { |
216 | ((ProxyRTSPClient*)rtspClient)->continueAfterPLAY(resultCode); |
217 | delete[] resultString; |
218 | } |
219 | |
220 | static void continueAfterOPTIONS(RTSPClient* rtspClient, int resultCode, char* resultString) { |
221 | Boolean serverSupportsGetParameter = False; |
222 | if (resultCode == 0) { |
223 | // Note whether the server told us that it supports the "GET_PARAMETER" command: |
224 | serverSupportsGetParameter = RTSPOptionIsSupported("GET_PARAMETER" , resultString); |
225 | } |
226 | ((ProxyRTSPClient*)rtspClient)->continueAfterLivenessCommand(resultCode, serverSupportsGetParameter); |
227 | delete[] resultString; |
228 | } |
229 | |
230 | #ifdef SEND_GET_PARAMETER_IF_SUPPORTED |
231 | static void continueAfterGET_PARAMETER(RTSPClient* rtspClient, int resultCode, char* resultString) { |
232 | ((ProxyRTSPClient*)rtspClient)->continueAfterLivenessCommand(resultCode, True); |
233 | delete[] resultString; |
234 | } |
235 | #endif |
236 | |
237 | |
238 | ////////// "ProxyRTSPClient" implementation ///////// |
239 | |
240 | UsageEnvironment& operator<<(UsageEnvironment& env, const ProxyRTSPClient& proxyRTSPClient) { // used for debugging |
241 | return env << "ProxyRTSPClient[" << proxyRTSPClient.url() << "]" ; |
242 | } |
243 | |
244 | ProxyRTSPClient::ProxyRTSPClient(ProxyServerMediaSession& ourServerMediaSession, char const* rtspURL, |
245 | char const* username, char const* password, |
246 | portNumBits tunnelOverHTTPPortNum, int verbosityLevel, int socketNumToServer) |
247 | : RTSPClient(ourServerMediaSession.envir(), rtspURL, verbosityLevel, "ProxyRTSPClient" , |
248 | tunnelOverHTTPPortNum == (portNumBits)(~0) ? 0 : tunnelOverHTTPPortNum, socketNumToServer), |
249 | fOurServerMediaSession(ourServerMediaSession), fOurURL(strDup(rtspURL)), fStreamRTPOverTCP(tunnelOverHTTPPortNum != 0), |
250 | fSetupQueueHead(NULL), fSetupQueueTail(NULL), fNumSetupsDone(0), fNextDESCRIBEDelay(1), |
251 | fServerSupportsGetParameter(False), fLastCommandWasPLAY(False), fDoneDESCRIBE(False), |
252 | fLivenessCommandTask(NULL), fDESCRIBECommandTask(NULL), fSubsessionTimerTask(NULL), fResetTask(NULL) { |
253 | if (username != NULL && password != NULL) { |
254 | fOurAuthenticator = new Authenticator(username, password); |
255 | } else { |
256 | fOurAuthenticator = NULL; |
257 | } |
258 | } |
259 | |
260 | void ProxyRTSPClient::reset() { |
261 | envir().taskScheduler().unscheduleDelayedTask(fLivenessCommandTask); fLivenessCommandTask = NULL; |
262 | envir().taskScheduler().unscheduleDelayedTask(fDESCRIBECommandTask); fDESCRIBECommandTask = NULL; |
263 | envir().taskScheduler().unscheduleDelayedTask(fSubsessionTimerTask); fSubsessionTimerTask = NULL; |
264 | envir().taskScheduler().unscheduleDelayedTask(fResetTask); fResetTask = NULL; |
265 | |
266 | fSetupQueueHead = fSetupQueueTail = NULL; |
267 | fNumSetupsDone = 0; |
268 | fNextDESCRIBEDelay = 1; |
269 | fLastCommandWasPLAY = False; |
270 | fDoneDESCRIBE = False; |
271 | |
272 | RTSPClient::reset(); |
273 | } |
274 | |
275 | ProxyRTSPClient::~ProxyRTSPClient() { |
276 | reset(); |
277 | |
278 | delete fOurAuthenticator; |
279 | delete[] fOurURL; |
280 | } |
281 | |
282 | int ProxyRTSPClient::connectToServer(int socketNum, portNumBits remotePortNum) { |
283 | int res; |
284 | res = RTSPClient::connectToServer(socketNum, remotePortNum); |
285 | |
286 | if (res == 0 && fDoneDESCRIBE && fStreamRTPOverTCP) { |
287 | if (fVerbosityLevel > 0) { |
288 | envir() << "ProxyRTSPClient::connectToServer calling scheduleReset()\n" ; |
289 | } |
290 | scheduleReset(); |
291 | } |
292 | |
293 | return res; |
294 | } |
295 | |
296 | void ProxyRTSPClient::continueAfterDESCRIBE(char const* sdpDescription) { |
297 | if (sdpDescription != NULL) { |
298 | fOurServerMediaSession.continueAfterDESCRIBE(sdpDescription); |
299 | |
300 | // Unlike most RTSP streams, there might be a long delay between this "DESCRIBE" command (to the downstream server) and the |
301 | // subsequent "SETUP"/"PLAY" - which doesn't occur until the first time that a client requests the stream. |
302 | // To prevent the proxied connection (between us and the downstream server) from timing out, we send periodic 'liveness' |
303 | // ("OPTIONS" or "GET_PARAMETER") commands. (The usual RTCP liveness mechanism wouldn't work here, because RTCP packets |
304 | // don't get sent until after the "PLAY" command.) |
305 | scheduleLivenessCommand(); |
306 | } else { |
307 | // The "DESCRIBE" command failed, most likely because the server or the stream is not yet running. |
308 | // Reschedule another "DESCRIBE" command to take place later: |
309 | scheduleDESCRIBECommand(); |
310 | } |
311 | fDoneDESCRIBE = True; |
312 | } |
313 | |
314 | void ProxyRTSPClient::continueAfterLivenessCommand(int resultCode, Boolean serverSupportsGetParameter) { |
315 | if (resultCode != 0) { |
316 | // The periodic 'liveness' command failed, suggesting that the back-end stream is no longer alive. |
317 | // We handle this by resetting our connection state with this server. Any current clients will be closed, but |
318 | // subsequent clients will cause new RTSP "SETUP"s and "PLAY"s to get done, restarting the stream. |
319 | // Then continue by sending more "DESCRIBE" commands, to try to restore the stream. |
320 | |
321 | fServerSupportsGetParameter = False; // until we learn otherwise, in response to a future "OPTIONS" command |
322 | |
323 | if (resultCode < 0) { |
324 | // The 'liveness' command failed without getting a response from the server (otherwise "resultCode" would have been > 0). |
325 | // This suggests that the RTSP connection itself has failed. Print this error code, in case it's useful for debugging: |
326 | if (fVerbosityLevel > 0) { |
327 | envir() << *this << ": lost connection to server ('errno': " << -resultCode << "). Scheduling reset...\n" ; |
328 | } |
329 | } |
330 | |
331 | scheduleReset(); |
332 | return; |
333 | } |
334 | |
335 | fServerSupportsGetParameter = serverSupportsGetParameter; |
336 | |
337 | // Schedule the next 'liveness' command (i.e., to tell the back-end server that we're still alive): |
338 | scheduleLivenessCommand(); |
339 | } |
340 | |
341 | #define SUBSESSION_TIMEOUT_SECONDS 5 // how many seconds to wait for the last track's "SETUP" to be done (note below) |
342 | |
343 | void ProxyRTSPClient::continueAfterSETUP(int resultCode) { |
344 | if (resultCode != 0) { |
345 | // The "SETUP" command failed, so arrange to reset the state. (We don't do this now, because it deletes the |
346 | // "ProxyServerMediaSubsession", and we can't do that during "ProxyServerMediaSubsession::createNewStreamSource()".) |
347 | scheduleReset(); |
348 | return; |
349 | } |
350 | |
351 | if (fVerbosityLevel > 0) { |
352 | envir() << *this << "::continueAfterSETUP(): head codec: " << fSetupQueueHead->codecName() |
353 | << "; numSubsessions " << fSetupQueueHead->fParentSession->numSubsessions() << "\n\tqueue:" ; |
354 | for (ProxyServerMediaSubsession* p = fSetupQueueHead; p != NULL; p = p->fNext) { |
355 | envir() << "\t" << p->codecName(); |
356 | } |
357 | envir() << "\n" ; |
358 | } |
359 | envir().taskScheduler().unscheduleDelayedTask(fSubsessionTimerTask); // in case it had been set |
360 | |
361 | // Dequeue the first "ProxyServerMediaSubsession" from our 'SETUP queue'. It will be the one for which this "SETUP" was done: |
362 | ProxyServerMediaSubsession* smss = fSetupQueueHead; // Assert: != NULL |
363 | fSetupQueueHead = fSetupQueueHead->fNext; |
364 | if (fSetupQueueHead == NULL) fSetupQueueTail = NULL; |
365 | |
366 | if (fSetupQueueHead != NULL) { |
367 | // There are still entries in the queue, for tracks for which we have still to do a "SETUP". |
368 | // "SETUP" the first of these now: |
369 | sendSetupCommand(fSetupQueueHead->fClientMediaSubsession, ::continueAfterSETUP, |
370 | False, fStreamRTPOverTCP, False, fOurAuthenticator); |
371 | ++fNumSetupsDone; |
372 | fSetupQueueHead->fHaveSetupStream = True; |
373 | } else { |
374 | if (fNumSetupsDone >= smss->fParentSession->numSubsessions()) { |
375 | // We've now finished setting up each of our subsessions (i.e., 'tracks'). |
376 | // Continue by sending a "PLAY" command (an 'aggregate' "PLAY" command, on the whole session): |
377 | sendPlayCommand(smss->fClientMediaSubsession.parentSession(), ::continueAfterPLAY, -1.0f, -1.0f, 1.0f, fOurAuthenticator); |
378 | // the "-1.0f" "start" parameter causes the "PLAY" to be sent without a "Range:" header, in case we'd already done |
379 | // a "PLAY" before (as a result of a 'subsession timeout' (note below)) |
380 | fLastCommandWasPLAY = True; |
381 | } else { |
382 | // Some of this session's subsessions (i.e., 'tracks') remain to be "SETUP". They might get "SETUP" very soon, but it's |
383 | // also possible - if the remote client chose to play only some of the session's tracks - that they might not. |
384 | // To allow for this possibility, we set a timer. If the timer expires without the remaining subsessions getting "SETUP", |
385 | // then we send a "PLAY" command anyway: |
386 | fSubsessionTimerTask |
387 | = envir().taskScheduler().scheduleDelayedTask(SUBSESSION_TIMEOUT_SECONDS*MILLION, (TaskFunc*)subsessionTimeout, this); |
388 | } |
389 | } |
390 | } |
391 | |
392 | void ProxyRTSPClient::continueAfterPLAY(int resultCode) { |
393 | if (resultCode != 0) { |
394 | // The "PLAY" command failed, so arrange to reset the state. (We don't do this now, because it deletes the |
395 | // "ProxyServerMediaSubsession", and we can't do that during "ProxyServerMediaSubsession::createNewStreamSource()".) |
396 | scheduleReset(); |
397 | return; |
398 | } |
399 | } |
400 | |
401 | void ProxyRTSPClient::scheduleLivenessCommand() { |
402 | // Delay a random time before sending another 'liveness' command. |
403 | unsigned delayMax = sessionTimeoutParameter(); // if the server specified a maximum time between 'liveness' probes, then use that |
404 | if (delayMax == 0) { |
405 | delayMax = 60; |
406 | } |
407 | |
408 | // Choose a random time from [delayMax/2,delayMax-1) seconds: |
409 | unsigned const us_1stPart = delayMax*500000; |
410 | unsigned uSecondsToDelay; |
411 | if (us_1stPart <= 1000000) { |
412 | uSecondsToDelay = us_1stPart; |
413 | } else { |
414 | unsigned const us_2ndPart = us_1stPart-1000000; |
415 | uSecondsToDelay = us_1stPart + (us_2ndPart*our_random())%us_2ndPart; |
416 | } |
417 | fLivenessCommandTask = envir().taskScheduler().scheduleDelayedTask(uSecondsToDelay, sendLivenessCommand, this); |
418 | } |
419 | |
420 | void ProxyRTSPClient::sendLivenessCommand(void* clientData) { |
421 | ProxyRTSPClient* rtspClient = (ProxyRTSPClient*)clientData; |
422 | |
423 | // Note. By default, we do not send "GET_PARAMETER" as our 'liveness notification' command, even if the server previously |
424 | // indicated (in its response to our earlier "OPTIONS" command) that it supported "GET_PARAMETER". This is because |
425 | // "GET_PARAMETER" crashes some camera servers (even though they claimed to support "GET_PARAMETER"). |
426 | #ifdef SEND_GET_PARAMETER_IF_SUPPORTED |
427 | MediaSession* sess = rtspClient->fOurServerMediaSession.fClientMediaSession; |
428 | |
429 | if (rtspClient->fServerSupportsGetParameter && rtspClient->fNumSetupsDone > 0 && sess != NULL) { |
430 | rtspClient->sendGetParameterCommand(*sess, ::continueAfterGET_PARAMETER, "" , rtspClient->auth()); |
431 | } else { |
432 | #endif |
433 | rtspClient->sendOptionsCommand(::continueAfterOPTIONS, rtspClient->auth()); |
434 | #ifdef SEND_GET_PARAMETER_IF_SUPPORTED |
435 | } |
436 | #endif |
437 | } |
438 | |
439 | void ProxyRTSPClient::scheduleReset() { |
440 | if (fVerbosityLevel > 0) { |
441 | envir() << "ProxyRTSPClient::scheduleReset\n" ; |
442 | } |
443 | envir().taskScheduler().rescheduleDelayedTask(fResetTask, 0, doReset, this); |
444 | } |
445 | |
446 | void ProxyRTSPClient::doReset() { |
447 | if (fVerbosityLevel > 0) { |
448 | envir() << *this << "::doReset\n" ; |
449 | } |
450 | |
451 | reset(); |
452 | fOurServerMediaSession.resetDESCRIBEState(); |
453 | |
454 | setBaseURL(fOurURL); // because we'll be sending an initial "DESCRIBE" all over again |
455 | sendDESCRIBE(this); |
456 | } |
457 | |
458 | void ProxyRTSPClient::doReset(void* clientData) { |
459 | ProxyRTSPClient* rtspClient = (ProxyRTSPClient*)clientData; |
460 | rtspClient->doReset(); |
461 | } |
462 | |
463 | void ProxyRTSPClient::scheduleDESCRIBECommand() { |
464 | // Delay 1s, 2s, 4s, 8s ... 256s until sending the next "DESCRIBE". Then, keep delaying a random time from [256..511] seconds: |
465 | unsigned secondsToDelay; |
466 | if (fNextDESCRIBEDelay <= 256) { |
467 | secondsToDelay = fNextDESCRIBEDelay; |
468 | fNextDESCRIBEDelay *= 2; |
469 | } else { |
470 | secondsToDelay = 256 + (our_random()&0xFF); // [256..511] seconds |
471 | } |
472 | |
473 | if (fVerbosityLevel > 0) { |
474 | envir() << *this << ": RTSP \"DESCRIBE\" command failed; trying again in " << secondsToDelay << " seconds\n" ; |
475 | } |
476 | fDESCRIBECommandTask = envir().taskScheduler().scheduleDelayedTask(secondsToDelay*MILLION, sendDESCRIBE, this); |
477 | } |
478 | |
479 | void ProxyRTSPClient::sendDESCRIBE(void* clientData) { |
480 | ProxyRTSPClient* rtspClient = (ProxyRTSPClient*)clientData; |
481 | if (rtspClient != NULL) rtspClient->sendDescribeCommand(::continueAfterDESCRIBE, rtspClient->auth()); |
482 | } |
483 | |
484 | void ProxyRTSPClient::subsessionTimeout(void* clientData) { |
485 | ((ProxyRTSPClient*)clientData)->handleSubsessionTimeout(); |
486 | } |
487 | |
488 | void ProxyRTSPClient::handleSubsessionTimeout() { |
489 | // We still have one or more subsessions ('tracks') left to "SETUP". But we can't wait any longer for them. Send a "PLAY" now: |
490 | MediaSession* sess = fOurServerMediaSession.fClientMediaSession; |
491 | if (sess != NULL) sendPlayCommand(*sess, ::continueAfterPLAY, -1.0f, -1.0f, 1.0f, fOurAuthenticator); |
492 | fLastCommandWasPLAY = True; |
493 | } |
494 | |
495 | |
496 | //////// "ProxyServerMediaSubsession" implementation ////////// |
497 | |
498 | ProxyServerMediaSubsession |
499 | ::ProxyServerMediaSubsession(MediaSubsession& mediaSubsession, |
500 | portNumBits initialPortNum, Boolean multiplexRTCPWithRTP) |
501 | : OnDemandServerMediaSubsession(mediaSubsession.parentSession().envir(), True/*reuseFirstSource*/, |
502 | initialPortNum, multiplexRTCPWithRTP), |
503 | fClientMediaSubsession(mediaSubsession), fCodecName(strDup(mediaSubsession.codecName())), |
504 | fNext(NULL), fHaveSetupStream(False) { |
505 | } |
506 | |
507 | UsageEnvironment& operator<<(UsageEnvironment& env, const ProxyServerMediaSubsession& psmss) { // used for debugging |
508 | return env << "ProxyServerMediaSubsession[" << psmss.url() << "," << psmss.codecName() << "]" ; |
509 | } |
510 | |
511 | ProxyServerMediaSubsession::~ProxyServerMediaSubsession() { |
512 | if (verbosityLevel() > 0) { |
513 | envir() << *this << "::~ProxyServerMediaSubsession()\n" ; |
514 | } |
515 | |
516 | delete[] (char*)fCodecName; |
517 | } |
518 | |
519 | FramedSource* ProxyServerMediaSubsession::createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate) { |
520 | ProxyServerMediaSession* const sms = (ProxyServerMediaSession*)fParentSession; |
521 | |
522 | if (verbosityLevel() > 0) { |
523 | envir() << *this << "::createNewStreamSource(session id " << clientSessionId << ")\n" ; |
524 | } |
525 | |
526 | // If we haven't yet created a data source from our 'media subsession' object, initiate() it to do so: |
527 | if (fClientMediaSubsession.readSource() == NULL) { |
528 | if (sms->fTranscodingTable == NULL || !sms->fTranscodingTable->weWillTranscode("audio" , "MPA-ROBUST" )) fClientMediaSubsession.receiveRawMP3ADUs(); // hack for proxying MPA-ROBUST streams |
529 | if (sms->fTranscodingTable == NULL || !sms->fTranscodingTable->weWillTranscode("video" , "JPEG" )) fClientMediaSubsession.receiveRawJPEGFrames(); // hack for proxying JPEG/RTP streams. |
530 | fClientMediaSubsession.initiate(); |
531 | if (verbosityLevel() > 0) { |
532 | envir() << "\tInitiated: " << *this << "\n" ; |
533 | } |
534 | |
535 | if (fClientMediaSubsession.readSource() != NULL) { |
536 | // First, check whether we have defined a 'transcoder' filter to be used with this codec: |
537 | if (sms->fTranscodingTable != NULL) { |
538 | char* outputCodecName; |
539 | FramedFilter* transcoder |
540 | = sms->fTranscodingTable->lookupTranscoder(fClientMediaSubsession, outputCodecName); |
541 | if (transcoder != NULL) { |
542 | fClientMediaSubsession.addFilter(transcoder); |
543 | delete[] (char*)fCodecName; fCodecName = outputCodecName; |
544 | } |
545 | } |
546 | |
547 | // Then, add to the front of all data sources a filter that will 'normalize' their frames' |
548 | // presentation times, before the frames get re-transmitted by our server: |
549 | FramedFilter* normalizerFilter = sms->fPresentationTimeSessionNormalizer |
550 | ->createNewPresentationTimeSubsessionNormalizer(fClientMediaSubsession.readSource(), |
551 | fClientMediaSubsession.rtpSource(), |
552 | fCodecName); |
553 | fClientMediaSubsession.addFilter(normalizerFilter); |
554 | |
555 | // Some data sources require a 'framer' object to be added, before they can be fed into |
556 | // a "RTPSink". Adjust for this now: |
557 | if (strcmp(fCodecName, "H264" ) == 0) { |
558 | fClientMediaSubsession.addFilter(H264VideoStreamDiscreteFramer |
559 | ::createNew(envir(), fClientMediaSubsession.readSource())); |
560 | } else if (strcmp(fCodecName, "H265" ) == 0) { |
561 | fClientMediaSubsession.addFilter(H265VideoStreamDiscreteFramer |
562 | ::createNew(envir(), fClientMediaSubsession.readSource())); |
563 | } else if (strcmp(fCodecName, "MP4V-ES" ) == 0) { |
564 | fClientMediaSubsession.addFilter(MPEG4VideoStreamDiscreteFramer |
565 | ::createNew(envir(), fClientMediaSubsession.readSource(), |
566 | True/* leave PTs unmodified*/)); |
567 | } else if (strcmp(fCodecName, "MPV" ) == 0) { |
568 | fClientMediaSubsession.addFilter(MPEG1or2VideoStreamDiscreteFramer |
569 | ::createNew(envir(), fClientMediaSubsession.readSource(), |
570 | False, 5.0, True/* leave PTs unmodified*/)); |
571 | } else if (strcmp(fCodecName, "DV" ) == 0) { |
572 | fClientMediaSubsession.addFilter(DVVideoStreamFramer |
573 | ::createNew(envir(), fClientMediaSubsession.readSource(), |
574 | False, True/* leave PTs unmodified*/)); |
575 | } |
576 | } |
577 | |
578 | if (fClientMediaSubsession.rtcpInstance() != NULL) { |
579 | fClientMediaSubsession.rtcpInstance()->setByeHandler(subsessionByeHandler, this); |
580 | } |
581 | } |
582 | |
583 | ProxyRTSPClient* const proxyRTSPClient = sms->fProxyRTSPClient; |
584 | if (clientSessionId != 0) { |
585 | // We're being called as a result of implementing a RTSP "SETUP". |
586 | if (!fHaveSetupStream) { |
587 | // This is our first "SETUP". Send RTSP "SETUP" and later "PLAY" commands to the proxied server, to start streaming: |
588 | // (Before sending "SETUP", enqueue ourselves on the "RTSPClient"s 'SETUP queue', so we'll be able to get the correct |
589 | // "ProxyServerMediaSubsession" to handle the response. (Note that responses come back in the same order as requests.)) |
590 | Boolean queueWasEmpty = proxyRTSPClient->fSetupQueueHead == NULL; |
591 | if (queueWasEmpty) { |
592 | proxyRTSPClient->fSetupQueueHead = this; |
593 | proxyRTSPClient->fSetupQueueTail = this; |
594 | } else { |
595 | // Add ourself to the "RTSPClient"s 'SETUP queue' (if we're not already on it): |
596 | ProxyServerMediaSubsession* psms; |
597 | for (psms = proxyRTSPClient->fSetupQueueHead; psms != NULL; psms = psms->fNext) { |
598 | if (psms == this) break; |
599 | } |
600 | if (psms == NULL) { |
601 | proxyRTSPClient->fSetupQueueTail->fNext = this; |
602 | proxyRTSPClient->fSetupQueueTail = this; |
603 | } |
604 | } |
605 | |
606 | // Hack: If there's already a pending "SETUP" request, don't send this track's "SETUP" right away, because |
607 | // the server might not properly handle 'pipelined' requests. Instead, wait until after previous "SETUP" responses come back. |
608 | if (queueWasEmpty) { |
609 | proxyRTSPClient->sendSetupCommand(fClientMediaSubsession, ::continueAfterSETUP, |
610 | False, proxyRTSPClient->fStreamRTPOverTCP, False, proxyRTSPClient->auth()); |
611 | ++proxyRTSPClient->fNumSetupsDone; |
612 | fHaveSetupStream = True; |
613 | } |
614 | } else { |
615 | // This is a "SETUP" from a new client. We know that there are no other currently active clients (otherwise we wouldn't |
616 | // have been called here), so we know that the substream was previously "PAUSE"d. Send "PLAY" downstream once again, |
617 | // to resume the stream: |
618 | if (!proxyRTSPClient->fLastCommandWasPLAY) { // so that we send only one "PLAY"; not one for each subsession |
619 | proxyRTSPClient->sendPlayCommand(fClientMediaSubsession.parentSession(), ::continueAfterPLAY, -1.0f/*resume from previous point*/, |
620 | -1.0f, 1.0f, proxyRTSPClient->auth()); |
621 | proxyRTSPClient->fLastCommandWasPLAY = True; |
622 | } |
623 | } |
624 | } |
625 | |
626 | estBitrate = fClientMediaSubsession.bandwidth(); |
627 | if (estBitrate == 0) estBitrate = 50; // kbps, estimate |
628 | return fClientMediaSubsession.readSource(); |
629 | } |
630 | |
631 | void ProxyServerMediaSubsession::closeStreamSource(FramedSource* inputSource) { |
632 | if (verbosityLevel() > 0) { |
633 | envir() << *this << "::closeStreamSource()\n" ; |
634 | } |
635 | // Because there's only one input source for this 'subsession' (regardless of how many downstream clients are proxying it), |
636 | // we don't close the input source here. (Instead, we wait until *this* object gets deleted.) |
637 | // However, because (as evidenced by this function having been called) we no longer have any clients accessing the stream, |
638 | // then we "PAUSE" the downstream proxied stream, until a new client arrives: |
639 | if (fHaveSetupStream) { |
640 | ProxyServerMediaSession* const sms = (ProxyServerMediaSession*)fParentSession; |
641 | ProxyRTSPClient* const proxyRTSPClient = sms->fProxyRTSPClient; |
642 | if (proxyRTSPClient->fLastCommandWasPLAY) { // so that we send only one "PAUSE"; not one for each subsession |
643 | if (fParentSession->referenceCount() > 1) { |
644 | // There are other client(s) still streaming other subsessions of this stream. |
645 | // Therefore, we don't send a "PAUSE" for the whole stream, but only for the sub-stream: |
646 | proxyRTSPClient->sendPauseCommand(fClientMediaSubsession, NULL, proxyRTSPClient->auth()); |
647 | } else { |
648 | // Normal case: There are no other client still streaming (parts of) this stream. |
649 | // Send a "PAUSE" for the whole stream. |
650 | proxyRTSPClient->sendPauseCommand(fClientMediaSubsession.parentSession(), NULL, proxyRTSPClient->auth()); |
651 | proxyRTSPClient->fLastCommandWasPLAY = False; |
652 | } |
653 | } |
654 | } |
655 | } |
656 | |
657 | RTPSink* ProxyServerMediaSubsession |
658 | ::createNewRTPSink(Groupsock* rtpGroupsock, unsigned char rtpPayloadTypeIfDynamic, FramedSource* inputSource) { |
659 | if (verbosityLevel() > 0) { |
660 | envir() << *this << "::createNewRTPSink()\n" ; |
661 | } |
662 | |
663 | // Create (and return) the appropriate "RTPSink" object for our codec: |
664 | // (Note: The configuration string might not be correct if a transcoder is used. FIX!) ##### |
665 | RTPSink* newSink; |
666 | if (strcmp(fCodecName, "AC3" ) == 0 || strcmp(fCodecName, "EAC3" ) == 0) { |
667 | newSink = AC3AudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, |
668 | fClientMediaSubsession.rtpTimestampFrequency()); |
669 | #if 0 // This code does not work; do *not* enable it: |
670 | } else if (strcmp(fCodecName, "AMR" ) == 0 || strcmp(fCodecName, "AMR-WB" ) == 0) { |
671 | Boolean isWideband = strcmp(fCodecName, "AMR-WB" ) == 0; |
672 | newSink = AMRAudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, |
673 | isWideband, fClientMediaSubsession.numChannels()); |
674 | #endif |
675 | } else if (strcmp(fCodecName, "DV" ) == 0) { |
676 | newSink = DVVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic); |
677 | } else if (strcmp(fCodecName, "GSM" ) == 0) { |
678 | newSink = GSMAudioRTPSink::createNew(envir(), rtpGroupsock); |
679 | } else if (strcmp(fCodecName, "H263-1998" ) == 0 || strcmp(fCodecName, "H263-2000" ) == 0) { |
680 | newSink = H263plusVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, |
681 | fClientMediaSubsession.rtpTimestampFrequency()); |
682 | } else if (strcmp(fCodecName, "H264" ) == 0) { |
683 | newSink = H264VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, |
684 | fClientMediaSubsession.fmtp_spropparametersets()); |
685 | } else if (strcmp(fCodecName, "H265" ) == 0) { |
686 | newSink = H265VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, |
687 | fClientMediaSubsession.fmtp_spropvps(), |
688 | fClientMediaSubsession.fmtp_spropsps(), |
689 | fClientMediaSubsession.fmtp_sproppps()); |
690 | } else if (strcmp(fCodecName, "JPEG" ) == 0) { |
691 | newSink = SimpleRTPSink::createNew(envir(), rtpGroupsock, 26, 90000, "video" , "JPEG" , |
692 | 1/*numChannels*/, False/*allowMultipleFramesPerPacket*/, False/*doNormalMBitRule*/); |
693 | } else if (strcmp(fCodecName, "MP4A-LATM" ) == 0) { |
694 | newSink = MPEG4LATMAudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, |
695 | fClientMediaSubsession.rtpTimestampFrequency(), |
696 | fClientMediaSubsession.fmtp_config(), |
697 | fClientMediaSubsession.numChannels()); |
698 | } else if (strcmp(fCodecName, "MP4V-ES" ) == 0) { |
699 | newSink = MPEG4ESVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, |
700 | fClientMediaSubsession.rtpTimestampFrequency(), |
701 | fClientMediaSubsession.attrVal_unsigned("profile-level-id" ), |
702 | fClientMediaSubsession.fmtp_config()); |
703 | } else if (strcmp(fCodecName, "MPA" ) == 0) { |
704 | newSink = MPEG1or2AudioRTPSink::createNew(envir(), rtpGroupsock); |
705 | } else if (strcmp(fCodecName, "MPA-ROBUST" ) == 0) { |
706 | newSink = MP3ADURTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic); |
707 | } else if (strcmp(fCodecName, "MPEG4-GENERIC" ) == 0) { |
708 | newSink = MPEG4GenericRTPSink::createNew(envir(), rtpGroupsock, |
709 | rtpPayloadTypeIfDynamic, fClientMediaSubsession.rtpTimestampFrequency(), |
710 | fClientMediaSubsession.mediumName(), |
711 | fClientMediaSubsession.attrVal_str("mode" ), |
712 | fClientMediaSubsession.fmtp_config(), fClientMediaSubsession.numChannels()); |
713 | } else if (strcmp(fCodecName, "MPV" ) == 0) { |
714 | newSink = MPEG1or2VideoRTPSink::createNew(envir(), rtpGroupsock); |
715 | } else if (strcmp(fCodecName, "OPUS" ) == 0) { |
716 | newSink = SimpleRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, |
717 | 48000, "audio" , "OPUS" , 2, False/*only 1 Opus 'packet' in each RTP packet*/); |
718 | } else if (strcmp(fCodecName, "T140" ) == 0) { |
719 | newSink = T140TextRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic); |
720 | } else if (strcmp(fCodecName, "THEORA" ) == 0) { |
721 | newSink = TheoraVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, |
722 | fClientMediaSubsession.fmtp_config()); |
723 | } else if (strcmp(fCodecName, "VORBIS" ) == 0) { |
724 | newSink = VorbisAudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, |
725 | fClientMediaSubsession.rtpTimestampFrequency(), fClientMediaSubsession.numChannels(), |
726 | fClientMediaSubsession.fmtp_config()); |
727 | } else if (strcmp(fCodecName, "VP8" ) == 0) { |
728 | newSink = VP8VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic); |
729 | } else if (strcmp(fCodecName, "VP9" ) == 0) { |
730 | newSink = VP9VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic); |
731 | } else if (strcmp(fCodecName, "AMR" ) == 0 || strcmp(fCodecName, "AMR-WB" ) == 0) { |
732 | // Proxying of these codecs is currently *not* supported, because the data received by the "RTPSource" object is not in a |
733 | // form that can be fed directly into a corresponding "RTPSink" object. |
734 | if (verbosityLevel() > 0) { |
735 | envir() << "\treturns NULL (because we currently don't support the proxying of \"" |
736 | << fClientMediaSubsession.mediumName() << "/" << fCodecName << "\" streams)\n" ; |
737 | } |
738 | return NULL; |
739 | } else if (strcmp(fCodecName, "QCELP" ) == 0 || |
740 | strcmp(fCodecName, "H261" ) == 0 || |
741 | strcmp(fCodecName, "H263-1998" ) == 0 || strcmp(fCodecName, "H263-2000" ) == 0 || |
742 | strcmp(fCodecName, "X-QT" ) == 0 || strcmp(fCodecName, "X-QUICKTIME" ) == 0) { |
743 | // This codec requires a specialized RTP payload format; however, we don't yet have an appropriate "RTPSink" subclass for it: |
744 | if (verbosityLevel() > 0) { |
745 | envir() << "\treturns NULL (because we don't have a \"RTPSink\" subclass for this RTP payload format)\n" ; |
746 | } |
747 | return NULL; |
748 | } else { |
749 | // This codec is assumed to have a simple RTP payload format that can be implemented just with a "SimpleRTPSink": |
750 | Boolean allowMultipleFramesPerPacket = True; // by default |
751 | Boolean doNormalMBitRule = True; // by default |
752 | // Some codecs change the above default parameters: |
753 | if (strcmp(fCodecName, "MP2T" ) == 0) { |
754 | doNormalMBitRule = False; // no RTP 'M' bit |
755 | } |
756 | newSink = SimpleRTPSink::createNew(envir(), rtpGroupsock, |
757 | rtpPayloadTypeIfDynamic, fClientMediaSubsession.rtpTimestampFrequency(), |
758 | fClientMediaSubsession.mediumName(), fCodecName, |
759 | fClientMediaSubsession.numChannels(), allowMultipleFramesPerPacket, doNormalMBitRule); |
760 | } |
761 | |
762 | // Because our relayed frames' presentation times are inaccurate until the input frames have been RTCP-synchronized, |
763 | // we temporarily disable RTCP "SR" reports for this "RTPSink" object: |
764 | newSink->enableRTCPReports() = False; |
765 | |
766 | // Also tell our "PresentationTimeSubsessionNormalizer" object about the "RTPSink", so it can enable RTCP "SR" reports later: |
767 | PresentationTimeSubsessionNormalizer* ssNormalizer; |
768 | if (strcmp(fCodecName, "H264" ) == 0 || |
769 | strcmp(fCodecName, "H265" ) == 0 || |
770 | strcmp(fCodecName, "MP4V-ES" ) == 0 || |
771 | strcmp(fCodecName, "MPV" ) == 0 || |
772 | strcmp(fCodecName, "DV" ) == 0) { |
773 | // There was a separate 'framer' object in front of the "PresentationTimeSubsessionNormalizer", so go back one object to get it: |
774 | ssNormalizer = (PresentationTimeSubsessionNormalizer*)(((FramedFilter*)inputSource)->inputSource()); |
775 | } else { |
776 | ssNormalizer = (PresentationTimeSubsessionNormalizer*)inputSource; |
777 | } |
778 | ssNormalizer->setRTPSink(newSink); |
779 | |
780 | return newSink; |
781 | } |
782 | |
783 | Groupsock* ProxyServerMediaSubsession::createGroupsock(struct in_addr const& addr, Port port) { |
784 | ProxyServerMediaSession* parentSession = (ProxyServerMediaSession*)fParentSession; |
785 | return parentSession->createGroupsock(addr, port); |
786 | } |
787 | |
788 | RTCPInstance* ProxyServerMediaSubsession |
789 | ::createRTCP(Groupsock* RTCPgs, unsigned totSessionBW, /* in kbps */ |
790 | unsigned char const* cname, RTPSink* sink) { |
791 | ProxyServerMediaSession* parentSession = (ProxyServerMediaSession*)fParentSession; |
792 | return parentSession->createRTCP(RTCPgs, totSessionBW, cname, sink); |
793 | } |
794 | |
795 | void ProxyServerMediaSubsession::subsessionByeHandler(void* clientData) { |
796 | ((ProxyServerMediaSubsession*)clientData)->subsessionByeHandler(); |
797 | } |
798 | |
799 | void ProxyServerMediaSubsession::subsessionByeHandler() { |
800 | if (verbosityLevel() > 0) { |
801 | envir() << *this << ": received RTCP \"BYE\". (The back-end stream has ended.)\n" ; |
802 | } |
803 | |
804 | // This "BYE" signals that our input source has (effectively) closed, so pass this onto the front-end clients: |
805 | fHaveSetupStream = False; // hack to stop "PAUSE" getting sent by: |
806 | if (fClientMediaSubsession.readSource() != NULL) { |
807 | fClientMediaSubsession.readSource()->handleClosure(); |
808 | } |
809 | |
810 | // And then treat this as if we had lost connection to the back-end server, |
811 | // and can reestablish streaming from it only by sending another "DESCRIBE": |
812 | ProxyServerMediaSession* const sms = (ProxyServerMediaSession*)fParentSession; |
813 | ProxyRTSPClient* const proxyRTSPClient = sms->fProxyRTSPClient; |
814 | proxyRTSPClient->scheduleReset(); |
815 | } |
816 | |
817 | |
818 | ////////// PresentationTimeSessionNormalizer and PresentationTimeSubsessionNormalizer implementations ////////// |
819 | |
820 | // PresentationTimeSessionNormalizer: |
821 | |
822 | PresentationTimeSessionNormalizer::PresentationTimeSessionNormalizer(UsageEnvironment& env) |
823 | : Medium(env), |
824 | fSubsessionNormalizers(NULL), fMasterSSNormalizer(NULL) { |
825 | } |
826 | |
827 | PresentationTimeSessionNormalizer::~PresentationTimeSessionNormalizer() { |
828 | while (fSubsessionNormalizers != NULL) { |
829 | Medium::close(fSubsessionNormalizers); |
830 | } |
831 | } |
832 | |
833 | PresentationTimeSubsessionNormalizer* PresentationTimeSessionNormalizer |
834 | ::createNewPresentationTimeSubsessionNormalizer(FramedSource* inputSource, RTPSource* rtpSource, |
835 | char const* codecName) { |
836 | fSubsessionNormalizers |
837 | = new PresentationTimeSubsessionNormalizer(*this, inputSource, rtpSource, codecName, fSubsessionNormalizers); |
838 | return fSubsessionNormalizers; |
839 | } |
840 | |
841 | void PresentationTimeSessionNormalizer |
842 | ::normalizePresentationTime(PresentationTimeSubsessionNormalizer* ssNormalizer, |
843 | struct timeval& toPT, struct timeval const& fromPT) { |
844 | Boolean const hasBeenSynced = ssNormalizer->fRTPSource->hasBeenSynchronizedUsingRTCP(); |
845 | |
846 | if (!hasBeenSynced) { |
847 | // If "fromPT" has not yet been RTCP-synchronized, then it was generated by our own receiving code, and thus |
848 | // is already aligned with 'wall-clock' time. Just copy it 'as is' to "toPT": |
849 | toPT = fromPT; |
850 | } else { |
851 | if (fMasterSSNormalizer == NULL) { |
852 | // Make "ssNormalizer" the 'master' subsession - meaning that its presentation time is adjusted to align with 'wall clock' |
853 | // time, and the presentation times of other subsessions (if any) are adjusted to retain their relative separation with |
854 | // those of the master: |
855 | fMasterSSNormalizer = ssNormalizer; |
856 | |
857 | struct timeval timeNow; |
858 | gettimeofday(&timeNow, NULL); |
859 | |
860 | // Compute: fPTAdjustment = timeNow - fromPT |
861 | fPTAdjustment.tv_sec = timeNow.tv_sec - fromPT.tv_sec; |
862 | fPTAdjustment.tv_usec = timeNow.tv_usec - fromPT.tv_usec; |
863 | // Note: It's OK if one or both of these fields underflows; the result still works out OK later. |
864 | } |
865 | |
866 | // Compute a normalized presentation time: toPT = fromPT + fPTAdjustment |
867 | toPT.tv_sec = fromPT.tv_sec + fPTAdjustment.tv_sec - 1; |
868 | toPT.tv_usec = fromPT.tv_usec + fPTAdjustment.tv_usec + MILLION; |
869 | while (toPT.tv_usec > MILLION) { ++toPT.tv_sec; toPT.tv_usec -= MILLION; } |
870 | |
871 | // Because "ssNormalizer"s relayed presentation times are accurate from now on, enable RTCP "SR" reports for its "RTPSink": |
872 | RTPSink* const rtpSink = ssNormalizer->fRTPSink; |
873 | if (rtpSink != NULL) { // sanity check; should always be true |
874 | rtpSink->enableRTCPReports() = True; |
875 | } |
876 | } |
877 | } |
878 | |
879 | void PresentationTimeSessionNormalizer |
880 | ::removePresentationTimeSubsessionNormalizer(PresentationTimeSubsessionNormalizer* ssNormalizer) { |
881 | // Unlink "ssNormalizer" from the linked list (starting with "fSubsessionNormalizers"): |
882 | if (fSubsessionNormalizers == ssNormalizer) { |
883 | fSubsessionNormalizers = fSubsessionNormalizers->fNext; |
884 | } else { |
885 | PresentationTimeSubsessionNormalizer** ssPtrPtr = &(fSubsessionNormalizers->fNext); |
886 | while (*ssPtrPtr != ssNormalizer) ssPtrPtr = &((*ssPtrPtr)->fNext); |
887 | *ssPtrPtr = (*ssPtrPtr)->fNext; |
888 | } |
889 | } |
890 | |
891 | // PresentationTimeSubsessionNormalizer: |
892 | |
893 | PresentationTimeSubsessionNormalizer |
894 | ::PresentationTimeSubsessionNormalizer(PresentationTimeSessionNormalizer& parent, FramedSource* inputSource, RTPSource* rtpSource, |
895 | char const* codecName, PresentationTimeSubsessionNormalizer* next) |
896 | : FramedFilter(parent.envir(), inputSource), |
897 | fParent(parent), fRTPSource(rtpSource), fRTPSink(NULL), fCodecName(codecName), fNext(next) { |
898 | } |
899 | |
900 | PresentationTimeSubsessionNormalizer::~PresentationTimeSubsessionNormalizer() { |
901 | fParent.removePresentationTimeSubsessionNormalizer(this); |
902 | } |
903 | |
904 | void PresentationTimeSubsessionNormalizer::afterGettingFrame(void* clientData, unsigned frameSize, |
905 | unsigned numTruncatedBytes, |
906 | struct timeval presentationTime, |
907 | unsigned durationInMicroseconds) { |
908 | ((PresentationTimeSubsessionNormalizer*)clientData) |
909 | ->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime, durationInMicroseconds); |
910 | } |
911 | |
912 | void PresentationTimeSubsessionNormalizer::afterGettingFrame(unsigned frameSize, |
913 | unsigned numTruncatedBytes, |
914 | struct timeval presentationTime, |
915 | unsigned durationInMicroseconds) { |
916 | // This filter is implemented by passing all frames through unchanged, except that "fPresentationTime" is changed: |
917 | fFrameSize = frameSize; |
918 | fNumTruncatedBytes = numTruncatedBytes; |
919 | fDurationInMicroseconds = durationInMicroseconds; |
920 | |
921 | fParent.normalizePresentationTime(this, fPresentationTime, presentationTime); |
922 | |
923 | // Hack for JPEG/RTP proxying. Because we're proxying JPEG by just copying the raw JPEG/RTP payloads, without interpreting them, |
924 | // we need to also 'copy' the RTP 'M' (marker) bit from the "RTPSource" to the "RTPSink": |
925 | if (fRTPSource->curPacketMarkerBit() && strcmp(fCodecName, "JPEG" ) == 0) ((SimpleRTPSink*)fRTPSink)->setMBitOnNextPacket(); |
926 | |
927 | // Complete delivery: |
928 | FramedSource::afterGetting(this); |
929 | } |
930 | |
931 | void PresentationTimeSubsessionNormalizer::doGetNextFrame() { |
932 | fInputSource->getNextFrame(fTo, fMaxSize, afterGettingFrame, this, FramedSource::handleClosure, this); |
933 | } |
934 | |