| 1 | /********** |
| 2 | This library is free software; you can redistribute it and/or modify it under |
| 3 | the terms of the GNU Lesser General Public License as published by the |
| 4 | Free Software Foundation; either version 3 of the License, or (at your |
| 5 | option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.) |
| 6 | |
| 7 | This library is distributed in the hope that it will be useful, but WITHOUT |
| 8 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| 9 | FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for |
| 10 | more details. |
| 11 | |
| 12 | You should have received a copy of the GNU Lesser General Public License |
| 13 | along with this library; if not, write to the Free Software Foundation, Inc., |
| 14 | 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
| 15 | **********/ |
| 16 | // "liveMedia" |
| 17 | // Copyright (c) 1996-2020 Live Networks, Inc. All rights reserved. |
| 18 | // A subclass of "ServerMediaSession" that can be used to create a (unicast) RTSP servers that acts as a 'proxy' for |
| 19 | // another (unicast or multicast) RTSP/RTP stream. |
| 20 | // Implementation |
| 21 | |
| 22 | #include "liveMedia.hh" |
| 23 | #include "RTSPCommon.hh" |
| 24 | #include "GroupsockHelper.hh" // for "our_random()" |
| 25 | |
| 26 | #ifndef MILLION |
| 27 | #define MILLION 1000000 |
| 28 | #endif |
| 29 | |
| 30 | // A "OnDemandServerMediaSubsession" subclass, used to implement a unicast RTSP server that's proxying another RTSP stream: |
| 31 | |
| 32 | class ProxyServerMediaSubsession: public OnDemandServerMediaSubsession { |
| 33 | public: |
| 34 | ProxyServerMediaSubsession(MediaSubsession& mediaSubsession, |
| 35 | portNumBits initialPortNum, Boolean multiplexRTCPWithRTP); |
| 36 | virtual ~ProxyServerMediaSubsession(); |
| 37 | |
| 38 | char const* codecName() const { return fCodecName; } |
| 39 | char const* url() const { return ((ProxyServerMediaSession*)fParentSession)->url(); } |
| 40 | |
| 41 | private: // redefined virtual functions |
| 42 | virtual FramedSource* createNewStreamSource(unsigned clientSessionId, |
| 43 | unsigned& estBitrate); |
| 44 | virtual void closeStreamSource(FramedSource *inputSource); |
| 45 | virtual RTPSink* createNewRTPSink(Groupsock* rtpGroupsock, |
| 46 | unsigned char rtpPayloadTypeIfDynamic, |
| 47 | FramedSource* inputSource); |
| 48 | virtual Groupsock* createGroupsock(struct in_addr const& addr, Port port); |
| 49 | virtual RTCPInstance* createRTCP(Groupsock* RTCPgs, unsigned totSessionBW, /* in kbps */ |
| 50 | unsigned char const* cname, RTPSink* sink); |
| 51 | |
| 52 | private: |
| 53 | static void subsessionByeHandler(void* clientData); |
| 54 | void subsessionByeHandler(); |
| 55 | |
| 56 | int verbosityLevel() const { return ((ProxyServerMediaSession*)fParentSession)->fVerbosityLevel; } |
| 57 | |
| 58 | private: |
| 59 | friend class ProxyRTSPClient; |
| 60 | MediaSubsession& fClientMediaSubsession; // the 'client' media subsession object that corresponds to this 'server' media subsession |
| 61 | char const* fCodecName; // copied from "fClientMediaSubsession" once it's been set up |
| 62 | ProxyServerMediaSubsession* fNext; // used when we're part of a queue |
| 63 | Boolean fHaveSetupStream; |
| 64 | }; |
| 65 | |
| 66 | |
| 67 | ////////// ProxyServerMediaSession implementation ////////// |
| 68 | |
| 69 | UsageEnvironment& operator<<(UsageEnvironment& env, const ProxyServerMediaSession& psms) { // used for debugging |
| 70 | return env << "ProxyServerMediaSession[" << psms.url() << "]" ; |
| 71 | } |
| 72 | |
| 73 | ProxyRTSPClient* |
| 74 | defaultCreateNewProxyRTSPClientFunc(ProxyServerMediaSession& ourServerMediaSession, |
| 75 | char const* rtspURL, |
| 76 | char const* username, char const* password, |
| 77 | portNumBits tunnelOverHTTPPortNum, int verbosityLevel, |
| 78 | int socketNumToServer) { |
| 79 | return new ProxyRTSPClient(ourServerMediaSession, rtspURL, username, password, |
| 80 | tunnelOverHTTPPortNum, verbosityLevel, socketNumToServer); |
| 81 | } |
| 82 | |
| 83 | ProxyServerMediaSession* ProxyServerMediaSession |
| 84 | ::createNew(UsageEnvironment& env, GenericMediaServer* ourMediaServer, |
| 85 | char const* inputStreamURL, char const* streamName, |
| 86 | char const* username, char const* password, |
| 87 | portNumBits tunnelOverHTTPPortNum, int verbosityLevel, int socketNumToServer, |
| 88 | MediaTranscodingTable* transcodingTable) { |
| 89 | return new ProxyServerMediaSession(env, ourMediaServer, inputStreamURL, streamName, username, password, |
| 90 | tunnelOverHTTPPortNum, verbosityLevel, socketNumToServer, |
| 91 | transcodingTable); |
| 92 | } |
| 93 | |
| 94 | |
| 95 | ProxyServerMediaSession |
| 96 | ::ProxyServerMediaSession(UsageEnvironment& env, GenericMediaServer* ourMediaServer, |
| 97 | char const* inputStreamURL, char const* streamName, |
| 98 | char const* username, char const* password, |
| 99 | portNumBits tunnelOverHTTPPortNum, int verbosityLevel, |
| 100 | int socketNumToServer, |
| 101 | MediaTranscodingTable* transcodingTable, |
| 102 | createNewProxyRTSPClientFunc* ourCreateNewProxyRTSPClientFunc, |
| 103 | portNumBits initialPortNum, Boolean multiplexRTCPWithRTP) |
| 104 | : ServerMediaSession(env, streamName, NULL, NULL, False, NULL), |
| 105 | describeCompletedFlag(0), fOurMediaServer(ourMediaServer), fClientMediaSession(NULL), |
| 106 | fVerbosityLevel(verbosityLevel), |
| 107 | fPresentationTimeSessionNormalizer(new PresentationTimeSessionNormalizer(envir())), |
| 108 | fCreateNewProxyRTSPClientFunc(ourCreateNewProxyRTSPClientFunc), |
| 109 | fTranscodingTable(transcodingTable), |
| 110 | fInitialPortNum(initialPortNum), fMultiplexRTCPWithRTP(multiplexRTCPWithRTP) { |
| 111 | // Open a RTSP connection to the input stream, and send a "DESCRIBE" command. |
| 112 | // We'll use the SDP description in the response to set ourselves up. |
| 113 | fProxyRTSPClient |
| 114 | = (*fCreateNewProxyRTSPClientFunc)(*this, inputStreamURL, username, password, |
| 115 | tunnelOverHTTPPortNum, |
| 116 | verbosityLevel > 0 ? verbosityLevel-1 : verbosityLevel, |
| 117 | socketNumToServer); |
| 118 | ProxyRTSPClient::sendDESCRIBE(fProxyRTSPClient); |
| 119 | } |
| 120 | |
| 121 | ProxyServerMediaSession::~ProxyServerMediaSession() { |
| 122 | if (fVerbosityLevel > 0) { |
| 123 | envir() << *this << "::~ProxyServerMediaSession()\n" ; |
| 124 | } |
| 125 | |
| 126 | // Begin by sending a "TEARDOWN" command (without checking for a response): |
| 127 | if (fProxyRTSPClient != NULL && fClientMediaSession != NULL) { |
| 128 | fProxyRTSPClient->sendTeardownCommand(*fClientMediaSession, NULL, fProxyRTSPClient->auth()); |
| 129 | } |
| 130 | |
| 131 | // Then delete our state: |
| 132 | Medium::close(fClientMediaSession); |
| 133 | Medium::close(fProxyRTSPClient); |
| 134 | Medium::close(fPresentationTimeSessionNormalizer); |
| 135 | } |
| 136 | |
| 137 | char const* ProxyServerMediaSession::url() const { |
| 138 | return fProxyRTSPClient == NULL ? NULL : fProxyRTSPClient->url(); |
| 139 | } |
| 140 | |
| 141 | Groupsock* ProxyServerMediaSession::createGroupsock(struct in_addr const& addr, Port port) { |
| 142 | // Default implementation; may be redefined by subclasses: |
| 143 | return new Groupsock(envir(), addr, port, 255); |
| 144 | } |
| 145 | |
| 146 | RTCPInstance* ProxyServerMediaSession |
| 147 | ::createRTCP(Groupsock* RTCPgs, unsigned totSessionBW, /* in kbps */ |
| 148 | unsigned char const* cname, RTPSink* sink) { |
| 149 | // Default implementation; may be redefined by subclasses: |
| 150 | return RTCPInstance::createNew(envir(), RTCPgs, totSessionBW, cname, sink, NULL/*we're a server*/); |
| 151 | } |
| 152 | |
| 153 | Boolean ProxyServerMediaSession::allowProxyingForSubsession(MediaSubsession const& /*mss*/) { |
| 154 | // Default implementation |
| 155 | return True; |
| 156 | } |
| 157 | |
| 158 | void ProxyServerMediaSession::continueAfterDESCRIBE(char const* sdpDescription) { |
| 159 | describeCompletedFlag = 1; |
| 160 | |
| 161 | // Create a (client) "MediaSession" object from the stream's SDP description ("resultString"), then iterate through its |
| 162 | // "MediaSubsession" objects, to set up corresponding "ServerMediaSubsession" objects that we'll use to serve the stream's tracks. |
| 163 | do { |
| 164 | fClientMediaSession = MediaSession::createNew(envir(), sdpDescription); |
| 165 | if (fClientMediaSession == NULL) break; |
| 166 | |
| 167 | MediaSubsessionIterator iter(*fClientMediaSession); |
| 168 | for (MediaSubsession* mss = iter.next(); mss != NULL; mss = iter.next()) { |
| 169 | if (!allowProxyingForSubsession(*mss)) continue; |
| 170 | |
| 171 | ServerMediaSubsession* smss |
| 172 | = new ProxyServerMediaSubsession(*mss, fInitialPortNum, fMultiplexRTCPWithRTP); |
| 173 | addSubsession(smss); |
| 174 | if (fVerbosityLevel > 0) { |
| 175 | envir() << *this << " added new \"ProxyServerMediaSubsession\" for " |
| 176 | << mss->protocolName() << "/" << mss->mediumName() << "/" << mss->codecName() << " track\n" ; |
| 177 | } |
| 178 | } |
| 179 | } while (0); |
| 180 | } |
| 181 | |
| 182 | void ProxyServerMediaSession::resetDESCRIBEState() { |
| 183 | // Delete all of our "ProxyServerMediaSubsession"s; they'll get set up again once we get a response to the new "DESCRIBE". |
| 184 | if (fOurMediaServer != NULL) { |
| 185 | // First, close any client connections that may have already been set up: |
| 186 | fOurMediaServer->closeAllClientSessionsForServerMediaSession(this); |
| 187 | } |
| 188 | deleteAllSubsessions(); |
| 189 | |
| 190 | // Finally, delete the client "MediaSession" object that we had set up after receiving the response to the previous "DESCRIBE": |
| 191 | Medium::close(fClientMediaSession); fClientMediaSession = NULL; |
| 192 | } |
| 193 | |
| 194 | ///////// RTSP 'response handlers' ////////// |
| 195 | |
| 196 | static void continueAfterDESCRIBE(RTSPClient* rtspClient, int resultCode, char* resultString) { |
| 197 | char const* res; |
| 198 | |
| 199 | if (resultCode == 0) { |
| 200 | // The "DESCRIBE" command succeeded, so "resultString" should be the stream's SDP description. |
| 201 | res = resultString; |
| 202 | } else { |
| 203 | // The "DESCRIBE" command failed. |
| 204 | res = NULL; |
| 205 | } |
| 206 | ((ProxyRTSPClient*)rtspClient)->continueAfterDESCRIBE(res); |
| 207 | delete[] resultString; |
| 208 | } |
| 209 | |
| 210 | static void continueAfterSETUP(RTSPClient* rtspClient, int resultCode, char* resultString) { |
| 211 | ((ProxyRTSPClient*)rtspClient)->continueAfterSETUP(resultCode); |
| 212 | delete[] resultString; |
| 213 | } |
| 214 | |
| 215 | static void continueAfterPLAY(RTSPClient* rtspClient, int resultCode, char* resultString) { |
| 216 | ((ProxyRTSPClient*)rtspClient)->continueAfterPLAY(resultCode); |
| 217 | delete[] resultString; |
| 218 | } |
| 219 | |
| 220 | static void continueAfterOPTIONS(RTSPClient* rtspClient, int resultCode, char* resultString) { |
| 221 | Boolean serverSupportsGetParameter = False; |
| 222 | if (resultCode == 0) { |
| 223 | // Note whether the server told us that it supports the "GET_PARAMETER" command: |
| 224 | serverSupportsGetParameter = RTSPOptionIsSupported("GET_PARAMETER" , resultString); |
| 225 | } |
| 226 | ((ProxyRTSPClient*)rtspClient)->continueAfterLivenessCommand(resultCode, serverSupportsGetParameter); |
| 227 | delete[] resultString; |
| 228 | } |
| 229 | |
| 230 | #ifdef SEND_GET_PARAMETER_IF_SUPPORTED |
| 231 | static void continueAfterGET_PARAMETER(RTSPClient* rtspClient, int resultCode, char* resultString) { |
| 232 | ((ProxyRTSPClient*)rtspClient)->continueAfterLivenessCommand(resultCode, True); |
| 233 | delete[] resultString; |
| 234 | } |
| 235 | #endif |
| 236 | |
| 237 | |
| 238 | ////////// "ProxyRTSPClient" implementation ///////// |
| 239 | |
| 240 | UsageEnvironment& operator<<(UsageEnvironment& env, const ProxyRTSPClient& proxyRTSPClient) { // used for debugging |
| 241 | return env << "ProxyRTSPClient[" << proxyRTSPClient.url() << "]" ; |
| 242 | } |
| 243 | |
| 244 | ProxyRTSPClient::ProxyRTSPClient(ProxyServerMediaSession& ourServerMediaSession, char const* rtspURL, |
| 245 | char const* username, char const* password, |
| 246 | portNumBits tunnelOverHTTPPortNum, int verbosityLevel, int socketNumToServer) |
| 247 | : RTSPClient(ourServerMediaSession.envir(), rtspURL, verbosityLevel, "ProxyRTSPClient" , |
| 248 | tunnelOverHTTPPortNum == (portNumBits)(~0) ? 0 : tunnelOverHTTPPortNum, socketNumToServer), |
| 249 | fOurServerMediaSession(ourServerMediaSession), fOurURL(strDup(rtspURL)), fStreamRTPOverTCP(tunnelOverHTTPPortNum != 0), |
| 250 | fSetupQueueHead(NULL), fSetupQueueTail(NULL), fNumSetupsDone(0), fNextDESCRIBEDelay(1), |
| 251 | fServerSupportsGetParameter(False), fLastCommandWasPLAY(False), fDoneDESCRIBE(False), |
| 252 | fLivenessCommandTask(NULL), fDESCRIBECommandTask(NULL), fSubsessionTimerTask(NULL), fResetTask(NULL) { |
| 253 | if (username != NULL && password != NULL) { |
| 254 | fOurAuthenticator = new Authenticator(username, password); |
| 255 | } else { |
| 256 | fOurAuthenticator = NULL; |
| 257 | } |
| 258 | } |
| 259 | |
| 260 | void ProxyRTSPClient::reset() { |
| 261 | envir().taskScheduler().unscheduleDelayedTask(fLivenessCommandTask); fLivenessCommandTask = NULL; |
| 262 | envir().taskScheduler().unscheduleDelayedTask(fDESCRIBECommandTask); fDESCRIBECommandTask = NULL; |
| 263 | envir().taskScheduler().unscheduleDelayedTask(fSubsessionTimerTask); fSubsessionTimerTask = NULL; |
| 264 | envir().taskScheduler().unscheduleDelayedTask(fResetTask); fResetTask = NULL; |
| 265 | |
| 266 | fSetupQueueHead = fSetupQueueTail = NULL; |
| 267 | fNumSetupsDone = 0; |
| 268 | fNextDESCRIBEDelay = 1; |
| 269 | fLastCommandWasPLAY = False; |
| 270 | fDoneDESCRIBE = False; |
| 271 | |
| 272 | RTSPClient::reset(); |
| 273 | } |
| 274 | |
| 275 | ProxyRTSPClient::~ProxyRTSPClient() { |
| 276 | reset(); |
| 277 | |
| 278 | delete fOurAuthenticator; |
| 279 | delete[] fOurURL; |
| 280 | } |
| 281 | |
| 282 | int ProxyRTSPClient::connectToServer(int socketNum, portNumBits remotePortNum) { |
| 283 | int res; |
| 284 | res = RTSPClient::connectToServer(socketNum, remotePortNum); |
| 285 | |
| 286 | if (res == 0 && fDoneDESCRIBE && fStreamRTPOverTCP) { |
| 287 | if (fVerbosityLevel > 0) { |
| 288 | envir() << "ProxyRTSPClient::connectToServer calling scheduleReset()\n" ; |
| 289 | } |
| 290 | scheduleReset(); |
| 291 | } |
| 292 | |
| 293 | return res; |
| 294 | } |
| 295 | |
| 296 | void ProxyRTSPClient::continueAfterDESCRIBE(char const* sdpDescription) { |
| 297 | if (sdpDescription != NULL) { |
| 298 | fOurServerMediaSession.continueAfterDESCRIBE(sdpDescription); |
| 299 | |
| 300 | // Unlike most RTSP streams, there might be a long delay between this "DESCRIBE" command (to the downstream server) and the |
| 301 | // subsequent "SETUP"/"PLAY" - which doesn't occur until the first time that a client requests the stream. |
| 302 | // To prevent the proxied connection (between us and the downstream server) from timing out, we send periodic 'liveness' |
| 303 | // ("OPTIONS" or "GET_PARAMETER") commands. (The usual RTCP liveness mechanism wouldn't work here, because RTCP packets |
| 304 | // don't get sent until after the "PLAY" command.) |
| 305 | scheduleLivenessCommand(); |
| 306 | } else { |
| 307 | // The "DESCRIBE" command failed, most likely because the server or the stream is not yet running. |
| 308 | // Reschedule another "DESCRIBE" command to take place later: |
| 309 | scheduleDESCRIBECommand(); |
| 310 | } |
| 311 | fDoneDESCRIBE = True; |
| 312 | } |
| 313 | |
| 314 | void ProxyRTSPClient::continueAfterLivenessCommand(int resultCode, Boolean serverSupportsGetParameter) { |
| 315 | if (resultCode != 0) { |
| 316 | // The periodic 'liveness' command failed, suggesting that the back-end stream is no longer alive. |
| 317 | // We handle this by resetting our connection state with this server. Any current clients will be closed, but |
| 318 | // subsequent clients will cause new RTSP "SETUP"s and "PLAY"s to get done, restarting the stream. |
| 319 | // Then continue by sending more "DESCRIBE" commands, to try to restore the stream. |
| 320 | |
| 321 | fServerSupportsGetParameter = False; // until we learn otherwise, in response to a future "OPTIONS" command |
| 322 | |
| 323 | if (resultCode < 0) { |
| 324 | // The 'liveness' command failed without getting a response from the server (otherwise "resultCode" would have been > 0). |
| 325 | // This suggests that the RTSP connection itself has failed. Print this error code, in case it's useful for debugging: |
| 326 | if (fVerbosityLevel > 0) { |
| 327 | envir() << *this << ": lost connection to server ('errno': " << -resultCode << "). Scheduling reset...\n" ; |
| 328 | } |
| 329 | } |
| 330 | |
| 331 | scheduleReset(); |
| 332 | return; |
| 333 | } |
| 334 | |
| 335 | fServerSupportsGetParameter = serverSupportsGetParameter; |
| 336 | |
| 337 | // Schedule the next 'liveness' command (i.e., to tell the back-end server that we're still alive): |
| 338 | scheduleLivenessCommand(); |
| 339 | } |
| 340 | |
| 341 | #define SUBSESSION_TIMEOUT_SECONDS 5 // how many seconds to wait for the last track's "SETUP" to be done (note below) |
| 342 | |
| 343 | void ProxyRTSPClient::continueAfterSETUP(int resultCode) { |
| 344 | if (resultCode != 0) { |
| 345 | // The "SETUP" command failed, so arrange to reset the state. (We don't do this now, because it deletes the |
| 346 | // "ProxyServerMediaSubsession", and we can't do that during "ProxyServerMediaSubsession::createNewStreamSource()".) |
| 347 | scheduleReset(); |
| 348 | return; |
| 349 | } |
| 350 | |
| 351 | if (fVerbosityLevel > 0) { |
| 352 | envir() << *this << "::continueAfterSETUP(): head codec: " << fSetupQueueHead->codecName() |
| 353 | << "; numSubsessions " << fSetupQueueHead->fParentSession->numSubsessions() << "\n\tqueue:" ; |
| 354 | for (ProxyServerMediaSubsession* p = fSetupQueueHead; p != NULL; p = p->fNext) { |
| 355 | envir() << "\t" << p->codecName(); |
| 356 | } |
| 357 | envir() << "\n" ; |
| 358 | } |
| 359 | envir().taskScheduler().unscheduleDelayedTask(fSubsessionTimerTask); // in case it had been set |
| 360 | |
| 361 | // Dequeue the first "ProxyServerMediaSubsession" from our 'SETUP queue'. It will be the one for which this "SETUP" was done: |
| 362 | ProxyServerMediaSubsession* smss = fSetupQueueHead; // Assert: != NULL |
| 363 | fSetupQueueHead = fSetupQueueHead->fNext; |
| 364 | if (fSetupQueueHead == NULL) fSetupQueueTail = NULL; |
| 365 | |
| 366 | if (fSetupQueueHead != NULL) { |
| 367 | // There are still entries in the queue, for tracks for which we have still to do a "SETUP". |
| 368 | // "SETUP" the first of these now: |
| 369 | sendSetupCommand(fSetupQueueHead->fClientMediaSubsession, ::continueAfterSETUP, |
| 370 | False, fStreamRTPOverTCP, False, fOurAuthenticator); |
| 371 | ++fNumSetupsDone; |
| 372 | fSetupQueueHead->fHaveSetupStream = True; |
| 373 | } else { |
| 374 | if (fNumSetupsDone >= smss->fParentSession->numSubsessions()) { |
| 375 | // We've now finished setting up each of our subsessions (i.e., 'tracks'). |
| 376 | // Continue by sending a "PLAY" command (an 'aggregate' "PLAY" command, on the whole session): |
| 377 | sendPlayCommand(smss->fClientMediaSubsession.parentSession(), ::continueAfterPLAY, -1.0f, -1.0f, 1.0f, fOurAuthenticator); |
| 378 | // the "-1.0f" "start" parameter causes the "PLAY" to be sent without a "Range:" header, in case we'd already done |
| 379 | // a "PLAY" before (as a result of a 'subsession timeout' (note below)) |
| 380 | fLastCommandWasPLAY = True; |
| 381 | } else { |
| 382 | // Some of this session's subsessions (i.e., 'tracks') remain to be "SETUP". They might get "SETUP" very soon, but it's |
| 383 | // also possible - if the remote client chose to play only some of the session's tracks - that they might not. |
| 384 | // To allow for this possibility, we set a timer. If the timer expires without the remaining subsessions getting "SETUP", |
| 385 | // then we send a "PLAY" command anyway: |
| 386 | fSubsessionTimerTask |
| 387 | = envir().taskScheduler().scheduleDelayedTask(SUBSESSION_TIMEOUT_SECONDS*MILLION, (TaskFunc*)subsessionTimeout, this); |
| 388 | } |
| 389 | } |
| 390 | } |
| 391 | |
| 392 | void ProxyRTSPClient::continueAfterPLAY(int resultCode) { |
| 393 | if (resultCode != 0) { |
| 394 | // The "PLAY" command failed, so arrange to reset the state. (We don't do this now, because it deletes the |
| 395 | // "ProxyServerMediaSubsession", and we can't do that during "ProxyServerMediaSubsession::createNewStreamSource()".) |
| 396 | scheduleReset(); |
| 397 | return; |
| 398 | } |
| 399 | } |
| 400 | |
| 401 | void ProxyRTSPClient::scheduleLivenessCommand() { |
| 402 | // Delay a random time before sending another 'liveness' command. |
| 403 | unsigned delayMax = sessionTimeoutParameter(); // if the server specified a maximum time between 'liveness' probes, then use that |
| 404 | if (delayMax == 0) { |
| 405 | delayMax = 60; |
| 406 | } |
| 407 | |
| 408 | // Choose a random time from [delayMax/2,delayMax-1) seconds: |
| 409 | unsigned const us_1stPart = delayMax*500000; |
| 410 | unsigned uSecondsToDelay; |
| 411 | if (us_1stPart <= 1000000) { |
| 412 | uSecondsToDelay = us_1stPart; |
| 413 | } else { |
| 414 | unsigned const us_2ndPart = us_1stPart-1000000; |
| 415 | uSecondsToDelay = us_1stPart + (us_2ndPart*our_random())%us_2ndPart; |
| 416 | } |
| 417 | fLivenessCommandTask = envir().taskScheduler().scheduleDelayedTask(uSecondsToDelay, sendLivenessCommand, this); |
| 418 | } |
| 419 | |
| 420 | void ProxyRTSPClient::sendLivenessCommand(void* clientData) { |
| 421 | ProxyRTSPClient* rtspClient = (ProxyRTSPClient*)clientData; |
| 422 | |
| 423 | // Note. By default, we do not send "GET_PARAMETER" as our 'liveness notification' command, even if the server previously |
| 424 | // indicated (in its response to our earlier "OPTIONS" command) that it supported "GET_PARAMETER". This is because |
| 425 | // "GET_PARAMETER" crashes some camera servers (even though they claimed to support "GET_PARAMETER"). |
| 426 | #ifdef SEND_GET_PARAMETER_IF_SUPPORTED |
| 427 | MediaSession* sess = rtspClient->fOurServerMediaSession.fClientMediaSession; |
| 428 | |
| 429 | if (rtspClient->fServerSupportsGetParameter && rtspClient->fNumSetupsDone > 0 && sess != NULL) { |
| 430 | rtspClient->sendGetParameterCommand(*sess, ::continueAfterGET_PARAMETER, "" , rtspClient->auth()); |
| 431 | } else { |
| 432 | #endif |
| 433 | rtspClient->sendOptionsCommand(::continueAfterOPTIONS, rtspClient->auth()); |
| 434 | #ifdef SEND_GET_PARAMETER_IF_SUPPORTED |
| 435 | } |
| 436 | #endif |
| 437 | } |
| 438 | |
| 439 | void ProxyRTSPClient::scheduleReset() { |
| 440 | if (fVerbosityLevel > 0) { |
| 441 | envir() << "ProxyRTSPClient::scheduleReset\n" ; |
| 442 | } |
| 443 | envir().taskScheduler().rescheduleDelayedTask(fResetTask, 0, doReset, this); |
| 444 | } |
| 445 | |
| 446 | void ProxyRTSPClient::doReset() { |
| 447 | if (fVerbosityLevel > 0) { |
| 448 | envir() << *this << "::doReset\n" ; |
| 449 | } |
| 450 | |
| 451 | reset(); |
| 452 | fOurServerMediaSession.resetDESCRIBEState(); |
| 453 | |
| 454 | setBaseURL(fOurURL); // because we'll be sending an initial "DESCRIBE" all over again |
| 455 | sendDESCRIBE(this); |
| 456 | } |
| 457 | |
| 458 | void ProxyRTSPClient::doReset(void* clientData) { |
| 459 | ProxyRTSPClient* rtspClient = (ProxyRTSPClient*)clientData; |
| 460 | rtspClient->doReset(); |
| 461 | } |
| 462 | |
| 463 | void ProxyRTSPClient::scheduleDESCRIBECommand() { |
| 464 | // Delay 1s, 2s, 4s, 8s ... 256s until sending the next "DESCRIBE". Then, keep delaying a random time from [256..511] seconds: |
| 465 | unsigned secondsToDelay; |
| 466 | if (fNextDESCRIBEDelay <= 256) { |
| 467 | secondsToDelay = fNextDESCRIBEDelay; |
| 468 | fNextDESCRIBEDelay *= 2; |
| 469 | } else { |
| 470 | secondsToDelay = 256 + (our_random()&0xFF); // [256..511] seconds |
| 471 | } |
| 472 | |
| 473 | if (fVerbosityLevel > 0) { |
| 474 | envir() << *this << ": RTSP \"DESCRIBE\" command failed; trying again in " << secondsToDelay << " seconds\n" ; |
| 475 | } |
| 476 | fDESCRIBECommandTask = envir().taskScheduler().scheduleDelayedTask(secondsToDelay*MILLION, sendDESCRIBE, this); |
| 477 | } |
| 478 | |
| 479 | void ProxyRTSPClient::sendDESCRIBE(void* clientData) { |
| 480 | ProxyRTSPClient* rtspClient = (ProxyRTSPClient*)clientData; |
| 481 | if (rtspClient != NULL) rtspClient->sendDescribeCommand(::continueAfterDESCRIBE, rtspClient->auth()); |
| 482 | } |
| 483 | |
| 484 | void ProxyRTSPClient::subsessionTimeout(void* clientData) { |
| 485 | ((ProxyRTSPClient*)clientData)->handleSubsessionTimeout(); |
| 486 | } |
| 487 | |
| 488 | void ProxyRTSPClient::handleSubsessionTimeout() { |
| 489 | // We still have one or more subsessions ('tracks') left to "SETUP". But we can't wait any longer for them. Send a "PLAY" now: |
| 490 | MediaSession* sess = fOurServerMediaSession.fClientMediaSession; |
| 491 | if (sess != NULL) sendPlayCommand(*sess, ::continueAfterPLAY, -1.0f, -1.0f, 1.0f, fOurAuthenticator); |
| 492 | fLastCommandWasPLAY = True; |
| 493 | } |
| 494 | |
| 495 | |
| 496 | //////// "ProxyServerMediaSubsession" implementation ////////// |
| 497 | |
| 498 | ProxyServerMediaSubsession |
| 499 | ::ProxyServerMediaSubsession(MediaSubsession& mediaSubsession, |
| 500 | portNumBits initialPortNum, Boolean multiplexRTCPWithRTP) |
| 501 | : OnDemandServerMediaSubsession(mediaSubsession.parentSession().envir(), True/*reuseFirstSource*/, |
| 502 | initialPortNum, multiplexRTCPWithRTP), |
| 503 | fClientMediaSubsession(mediaSubsession), fCodecName(strDup(mediaSubsession.codecName())), |
| 504 | fNext(NULL), fHaveSetupStream(False) { |
| 505 | } |
| 506 | |
| 507 | UsageEnvironment& operator<<(UsageEnvironment& env, const ProxyServerMediaSubsession& psmss) { // used for debugging |
| 508 | return env << "ProxyServerMediaSubsession[" << psmss.url() << "," << psmss.codecName() << "]" ; |
| 509 | } |
| 510 | |
| 511 | ProxyServerMediaSubsession::~ProxyServerMediaSubsession() { |
| 512 | if (verbosityLevel() > 0) { |
| 513 | envir() << *this << "::~ProxyServerMediaSubsession()\n" ; |
| 514 | } |
| 515 | |
| 516 | delete[] (char*)fCodecName; |
| 517 | } |
| 518 | |
| 519 | FramedSource* ProxyServerMediaSubsession::createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate) { |
| 520 | ProxyServerMediaSession* const sms = (ProxyServerMediaSession*)fParentSession; |
| 521 | |
| 522 | if (verbosityLevel() > 0) { |
| 523 | envir() << *this << "::createNewStreamSource(session id " << clientSessionId << ")\n" ; |
| 524 | } |
| 525 | |
| 526 | // If we haven't yet created a data source from our 'media subsession' object, initiate() it to do so: |
| 527 | if (fClientMediaSubsession.readSource() == NULL) { |
| 528 | if (sms->fTranscodingTable == NULL || !sms->fTranscodingTable->weWillTranscode("audio" , "MPA-ROBUST" )) fClientMediaSubsession.receiveRawMP3ADUs(); // hack for proxying MPA-ROBUST streams |
| 529 | if (sms->fTranscodingTable == NULL || !sms->fTranscodingTable->weWillTranscode("video" , "JPEG" )) fClientMediaSubsession.receiveRawJPEGFrames(); // hack for proxying JPEG/RTP streams. |
| 530 | fClientMediaSubsession.initiate(); |
| 531 | if (verbosityLevel() > 0) { |
| 532 | envir() << "\tInitiated: " << *this << "\n" ; |
| 533 | } |
| 534 | |
| 535 | if (fClientMediaSubsession.readSource() != NULL) { |
| 536 | // First, check whether we have defined a 'transcoder' filter to be used with this codec: |
| 537 | if (sms->fTranscodingTable != NULL) { |
| 538 | char* outputCodecName; |
| 539 | FramedFilter* transcoder |
| 540 | = sms->fTranscodingTable->lookupTranscoder(fClientMediaSubsession, outputCodecName); |
| 541 | if (transcoder != NULL) { |
| 542 | fClientMediaSubsession.addFilter(transcoder); |
| 543 | delete[] (char*)fCodecName; fCodecName = outputCodecName; |
| 544 | } |
| 545 | } |
| 546 | |
| 547 | // Then, add to the front of all data sources a filter that will 'normalize' their frames' |
| 548 | // presentation times, before the frames get re-transmitted by our server: |
| 549 | FramedFilter* normalizerFilter = sms->fPresentationTimeSessionNormalizer |
| 550 | ->createNewPresentationTimeSubsessionNormalizer(fClientMediaSubsession.readSource(), |
| 551 | fClientMediaSubsession.rtpSource(), |
| 552 | fCodecName); |
| 553 | fClientMediaSubsession.addFilter(normalizerFilter); |
| 554 | |
| 555 | // Some data sources require a 'framer' object to be added, before they can be fed into |
| 556 | // a "RTPSink". Adjust for this now: |
| 557 | if (strcmp(fCodecName, "H264" ) == 0) { |
| 558 | fClientMediaSubsession.addFilter(H264VideoStreamDiscreteFramer |
| 559 | ::createNew(envir(), fClientMediaSubsession.readSource())); |
| 560 | } else if (strcmp(fCodecName, "H265" ) == 0) { |
| 561 | fClientMediaSubsession.addFilter(H265VideoStreamDiscreteFramer |
| 562 | ::createNew(envir(), fClientMediaSubsession.readSource())); |
| 563 | } else if (strcmp(fCodecName, "MP4V-ES" ) == 0) { |
| 564 | fClientMediaSubsession.addFilter(MPEG4VideoStreamDiscreteFramer |
| 565 | ::createNew(envir(), fClientMediaSubsession.readSource(), |
| 566 | True/* leave PTs unmodified*/)); |
| 567 | } else if (strcmp(fCodecName, "MPV" ) == 0) { |
| 568 | fClientMediaSubsession.addFilter(MPEG1or2VideoStreamDiscreteFramer |
| 569 | ::createNew(envir(), fClientMediaSubsession.readSource(), |
| 570 | False, 5.0, True/* leave PTs unmodified*/)); |
| 571 | } else if (strcmp(fCodecName, "DV" ) == 0) { |
| 572 | fClientMediaSubsession.addFilter(DVVideoStreamFramer |
| 573 | ::createNew(envir(), fClientMediaSubsession.readSource(), |
| 574 | False, True/* leave PTs unmodified*/)); |
| 575 | } |
| 576 | } |
| 577 | |
| 578 | if (fClientMediaSubsession.rtcpInstance() != NULL) { |
| 579 | fClientMediaSubsession.rtcpInstance()->setByeHandler(subsessionByeHandler, this); |
| 580 | } |
| 581 | } |
| 582 | |
| 583 | ProxyRTSPClient* const proxyRTSPClient = sms->fProxyRTSPClient; |
| 584 | if (clientSessionId != 0) { |
| 585 | // We're being called as a result of implementing a RTSP "SETUP". |
| 586 | if (!fHaveSetupStream) { |
| 587 | // This is our first "SETUP". Send RTSP "SETUP" and later "PLAY" commands to the proxied server, to start streaming: |
| 588 | // (Before sending "SETUP", enqueue ourselves on the "RTSPClient"s 'SETUP queue', so we'll be able to get the correct |
| 589 | // "ProxyServerMediaSubsession" to handle the response. (Note that responses come back in the same order as requests.)) |
| 590 | Boolean queueWasEmpty = proxyRTSPClient->fSetupQueueHead == NULL; |
| 591 | if (queueWasEmpty) { |
| 592 | proxyRTSPClient->fSetupQueueHead = this; |
| 593 | proxyRTSPClient->fSetupQueueTail = this; |
| 594 | } else { |
| 595 | // Add ourself to the "RTSPClient"s 'SETUP queue' (if we're not already on it): |
| 596 | ProxyServerMediaSubsession* psms; |
| 597 | for (psms = proxyRTSPClient->fSetupQueueHead; psms != NULL; psms = psms->fNext) { |
| 598 | if (psms == this) break; |
| 599 | } |
| 600 | if (psms == NULL) { |
| 601 | proxyRTSPClient->fSetupQueueTail->fNext = this; |
| 602 | proxyRTSPClient->fSetupQueueTail = this; |
| 603 | } |
| 604 | } |
| 605 | |
| 606 | // Hack: If there's already a pending "SETUP" request, don't send this track's "SETUP" right away, because |
| 607 | // the server might not properly handle 'pipelined' requests. Instead, wait until after previous "SETUP" responses come back. |
| 608 | if (queueWasEmpty) { |
| 609 | proxyRTSPClient->sendSetupCommand(fClientMediaSubsession, ::continueAfterSETUP, |
| 610 | False, proxyRTSPClient->fStreamRTPOverTCP, False, proxyRTSPClient->auth()); |
| 611 | ++proxyRTSPClient->fNumSetupsDone; |
| 612 | fHaveSetupStream = True; |
| 613 | } |
| 614 | } else { |
| 615 | // This is a "SETUP" from a new client. We know that there are no other currently active clients (otherwise we wouldn't |
| 616 | // have been called here), so we know that the substream was previously "PAUSE"d. Send "PLAY" downstream once again, |
| 617 | // to resume the stream: |
| 618 | if (!proxyRTSPClient->fLastCommandWasPLAY) { // so that we send only one "PLAY"; not one for each subsession |
| 619 | proxyRTSPClient->sendPlayCommand(fClientMediaSubsession.parentSession(), ::continueAfterPLAY, -1.0f/*resume from previous point*/, |
| 620 | -1.0f, 1.0f, proxyRTSPClient->auth()); |
| 621 | proxyRTSPClient->fLastCommandWasPLAY = True; |
| 622 | } |
| 623 | } |
| 624 | } |
| 625 | |
| 626 | estBitrate = fClientMediaSubsession.bandwidth(); |
| 627 | if (estBitrate == 0) estBitrate = 50; // kbps, estimate |
| 628 | return fClientMediaSubsession.readSource(); |
| 629 | } |
| 630 | |
| 631 | void ProxyServerMediaSubsession::closeStreamSource(FramedSource* inputSource) { |
| 632 | if (verbosityLevel() > 0) { |
| 633 | envir() << *this << "::closeStreamSource()\n" ; |
| 634 | } |
| 635 | // Because there's only one input source for this 'subsession' (regardless of how many downstream clients are proxying it), |
| 636 | // we don't close the input source here. (Instead, we wait until *this* object gets deleted.) |
| 637 | // However, because (as evidenced by this function having been called) we no longer have any clients accessing the stream, |
| 638 | // then we "PAUSE" the downstream proxied stream, until a new client arrives: |
| 639 | if (fHaveSetupStream) { |
| 640 | ProxyServerMediaSession* const sms = (ProxyServerMediaSession*)fParentSession; |
| 641 | ProxyRTSPClient* const proxyRTSPClient = sms->fProxyRTSPClient; |
| 642 | if (proxyRTSPClient->fLastCommandWasPLAY) { // so that we send only one "PAUSE"; not one for each subsession |
| 643 | if (fParentSession->referenceCount() > 1) { |
| 644 | // There are other client(s) still streaming other subsessions of this stream. |
| 645 | // Therefore, we don't send a "PAUSE" for the whole stream, but only for the sub-stream: |
| 646 | proxyRTSPClient->sendPauseCommand(fClientMediaSubsession, NULL, proxyRTSPClient->auth()); |
| 647 | } else { |
| 648 | // Normal case: There are no other client still streaming (parts of) this stream. |
| 649 | // Send a "PAUSE" for the whole stream. |
| 650 | proxyRTSPClient->sendPauseCommand(fClientMediaSubsession.parentSession(), NULL, proxyRTSPClient->auth()); |
| 651 | proxyRTSPClient->fLastCommandWasPLAY = False; |
| 652 | } |
| 653 | } |
| 654 | } |
| 655 | } |
| 656 | |
| 657 | RTPSink* ProxyServerMediaSubsession |
| 658 | ::createNewRTPSink(Groupsock* rtpGroupsock, unsigned char rtpPayloadTypeIfDynamic, FramedSource* inputSource) { |
| 659 | if (verbosityLevel() > 0) { |
| 660 | envir() << *this << "::createNewRTPSink()\n" ; |
| 661 | } |
| 662 | |
| 663 | // Create (and return) the appropriate "RTPSink" object for our codec: |
| 664 | // (Note: The configuration string might not be correct if a transcoder is used. FIX!) ##### |
| 665 | RTPSink* newSink; |
| 666 | if (strcmp(fCodecName, "AC3" ) == 0 || strcmp(fCodecName, "EAC3" ) == 0) { |
| 667 | newSink = AC3AudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, |
| 668 | fClientMediaSubsession.rtpTimestampFrequency()); |
| 669 | #if 0 // This code does not work; do *not* enable it: |
| 670 | } else if (strcmp(fCodecName, "AMR" ) == 0 || strcmp(fCodecName, "AMR-WB" ) == 0) { |
| 671 | Boolean isWideband = strcmp(fCodecName, "AMR-WB" ) == 0; |
| 672 | newSink = AMRAudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, |
| 673 | isWideband, fClientMediaSubsession.numChannels()); |
| 674 | #endif |
| 675 | } else if (strcmp(fCodecName, "DV" ) == 0) { |
| 676 | newSink = DVVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic); |
| 677 | } else if (strcmp(fCodecName, "GSM" ) == 0) { |
| 678 | newSink = GSMAudioRTPSink::createNew(envir(), rtpGroupsock); |
| 679 | } else if (strcmp(fCodecName, "H263-1998" ) == 0 || strcmp(fCodecName, "H263-2000" ) == 0) { |
| 680 | newSink = H263plusVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, |
| 681 | fClientMediaSubsession.rtpTimestampFrequency()); |
| 682 | } else if (strcmp(fCodecName, "H264" ) == 0) { |
| 683 | newSink = H264VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, |
| 684 | fClientMediaSubsession.fmtp_spropparametersets()); |
| 685 | } else if (strcmp(fCodecName, "H265" ) == 0) { |
| 686 | newSink = H265VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, |
| 687 | fClientMediaSubsession.fmtp_spropvps(), |
| 688 | fClientMediaSubsession.fmtp_spropsps(), |
| 689 | fClientMediaSubsession.fmtp_sproppps()); |
| 690 | } else if (strcmp(fCodecName, "JPEG" ) == 0) { |
| 691 | newSink = SimpleRTPSink::createNew(envir(), rtpGroupsock, 26, 90000, "video" , "JPEG" , |
| 692 | 1/*numChannels*/, False/*allowMultipleFramesPerPacket*/, False/*doNormalMBitRule*/); |
| 693 | } else if (strcmp(fCodecName, "MP4A-LATM" ) == 0) { |
| 694 | newSink = MPEG4LATMAudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, |
| 695 | fClientMediaSubsession.rtpTimestampFrequency(), |
| 696 | fClientMediaSubsession.fmtp_config(), |
| 697 | fClientMediaSubsession.numChannels()); |
| 698 | } else if (strcmp(fCodecName, "MP4V-ES" ) == 0) { |
| 699 | newSink = MPEG4ESVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, |
| 700 | fClientMediaSubsession.rtpTimestampFrequency(), |
| 701 | fClientMediaSubsession.attrVal_unsigned("profile-level-id" ), |
| 702 | fClientMediaSubsession.fmtp_config()); |
| 703 | } else if (strcmp(fCodecName, "MPA" ) == 0) { |
| 704 | newSink = MPEG1or2AudioRTPSink::createNew(envir(), rtpGroupsock); |
| 705 | } else if (strcmp(fCodecName, "MPA-ROBUST" ) == 0) { |
| 706 | newSink = MP3ADURTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic); |
| 707 | } else if (strcmp(fCodecName, "MPEG4-GENERIC" ) == 0) { |
| 708 | newSink = MPEG4GenericRTPSink::createNew(envir(), rtpGroupsock, |
| 709 | rtpPayloadTypeIfDynamic, fClientMediaSubsession.rtpTimestampFrequency(), |
| 710 | fClientMediaSubsession.mediumName(), |
| 711 | fClientMediaSubsession.attrVal_str("mode" ), |
| 712 | fClientMediaSubsession.fmtp_config(), fClientMediaSubsession.numChannels()); |
| 713 | } else if (strcmp(fCodecName, "MPV" ) == 0) { |
| 714 | newSink = MPEG1or2VideoRTPSink::createNew(envir(), rtpGroupsock); |
| 715 | } else if (strcmp(fCodecName, "OPUS" ) == 0) { |
| 716 | newSink = SimpleRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, |
| 717 | 48000, "audio" , "OPUS" , 2, False/*only 1 Opus 'packet' in each RTP packet*/); |
| 718 | } else if (strcmp(fCodecName, "T140" ) == 0) { |
| 719 | newSink = T140TextRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic); |
| 720 | } else if (strcmp(fCodecName, "THEORA" ) == 0) { |
| 721 | newSink = TheoraVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, |
| 722 | fClientMediaSubsession.fmtp_config()); |
| 723 | } else if (strcmp(fCodecName, "VORBIS" ) == 0) { |
| 724 | newSink = VorbisAudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, |
| 725 | fClientMediaSubsession.rtpTimestampFrequency(), fClientMediaSubsession.numChannels(), |
| 726 | fClientMediaSubsession.fmtp_config()); |
| 727 | } else if (strcmp(fCodecName, "VP8" ) == 0) { |
| 728 | newSink = VP8VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic); |
| 729 | } else if (strcmp(fCodecName, "VP9" ) == 0) { |
| 730 | newSink = VP9VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic); |
| 731 | } else if (strcmp(fCodecName, "AMR" ) == 0 || strcmp(fCodecName, "AMR-WB" ) == 0) { |
| 732 | // Proxying of these codecs is currently *not* supported, because the data received by the "RTPSource" object is not in a |
| 733 | // form that can be fed directly into a corresponding "RTPSink" object. |
| 734 | if (verbosityLevel() > 0) { |
| 735 | envir() << "\treturns NULL (because we currently don't support the proxying of \"" |
| 736 | << fClientMediaSubsession.mediumName() << "/" << fCodecName << "\" streams)\n" ; |
| 737 | } |
| 738 | return NULL; |
| 739 | } else if (strcmp(fCodecName, "QCELP" ) == 0 || |
| 740 | strcmp(fCodecName, "H261" ) == 0 || |
| 741 | strcmp(fCodecName, "H263-1998" ) == 0 || strcmp(fCodecName, "H263-2000" ) == 0 || |
| 742 | strcmp(fCodecName, "X-QT" ) == 0 || strcmp(fCodecName, "X-QUICKTIME" ) == 0) { |
| 743 | // This codec requires a specialized RTP payload format; however, we don't yet have an appropriate "RTPSink" subclass for it: |
| 744 | if (verbosityLevel() > 0) { |
| 745 | envir() << "\treturns NULL (because we don't have a \"RTPSink\" subclass for this RTP payload format)\n" ; |
| 746 | } |
| 747 | return NULL; |
| 748 | } else { |
| 749 | // This codec is assumed to have a simple RTP payload format that can be implemented just with a "SimpleRTPSink": |
| 750 | Boolean allowMultipleFramesPerPacket = True; // by default |
| 751 | Boolean doNormalMBitRule = True; // by default |
| 752 | // Some codecs change the above default parameters: |
| 753 | if (strcmp(fCodecName, "MP2T" ) == 0) { |
| 754 | doNormalMBitRule = False; // no RTP 'M' bit |
| 755 | } |
| 756 | newSink = SimpleRTPSink::createNew(envir(), rtpGroupsock, |
| 757 | rtpPayloadTypeIfDynamic, fClientMediaSubsession.rtpTimestampFrequency(), |
| 758 | fClientMediaSubsession.mediumName(), fCodecName, |
| 759 | fClientMediaSubsession.numChannels(), allowMultipleFramesPerPacket, doNormalMBitRule); |
| 760 | } |
| 761 | |
| 762 | // Because our relayed frames' presentation times are inaccurate until the input frames have been RTCP-synchronized, |
| 763 | // we temporarily disable RTCP "SR" reports for this "RTPSink" object: |
| 764 | newSink->enableRTCPReports() = False; |
| 765 | |
| 766 | // Also tell our "PresentationTimeSubsessionNormalizer" object about the "RTPSink", so it can enable RTCP "SR" reports later: |
| 767 | PresentationTimeSubsessionNormalizer* ssNormalizer; |
| 768 | if (strcmp(fCodecName, "H264" ) == 0 || |
| 769 | strcmp(fCodecName, "H265" ) == 0 || |
| 770 | strcmp(fCodecName, "MP4V-ES" ) == 0 || |
| 771 | strcmp(fCodecName, "MPV" ) == 0 || |
| 772 | strcmp(fCodecName, "DV" ) == 0) { |
| 773 | // There was a separate 'framer' object in front of the "PresentationTimeSubsessionNormalizer", so go back one object to get it: |
| 774 | ssNormalizer = (PresentationTimeSubsessionNormalizer*)(((FramedFilter*)inputSource)->inputSource()); |
| 775 | } else { |
| 776 | ssNormalizer = (PresentationTimeSubsessionNormalizer*)inputSource; |
| 777 | } |
| 778 | ssNormalizer->setRTPSink(newSink); |
| 779 | |
| 780 | return newSink; |
| 781 | } |
| 782 | |
| 783 | Groupsock* ProxyServerMediaSubsession::createGroupsock(struct in_addr const& addr, Port port) { |
| 784 | ProxyServerMediaSession* parentSession = (ProxyServerMediaSession*)fParentSession; |
| 785 | return parentSession->createGroupsock(addr, port); |
| 786 | } |
| 787 | |
| 788 | RTCPInstance* ProxyServerMediaSubsession |
| 789 | ::createRTCP(Groupsock* RTCPgs, unsigned totSessionBW, /* in kbps */ |
| 790 | unsigned char const* cname, RTPSink* sink) { |
| 791 | ProxyServerMediaSession* parentSession = (ProxyServerMediaSession*)fParentSession; |
| 792 | return parentSession->createRTCP(RTCPgs, totSessionBW, cname, sink); |
| 793 | } |
| 794 | |
| 795 | void ProxyServerMediaSubsession::subsessionByeHandler(void* clientData) { |
| 796 | ((ProxyServerMediaSubsession*)clientData)->subsessionByeHandler(); |
| 797 | } |
| 798 | |
| 799 | void ProxyServerMediaSubsession::subsessionByeHandler() { |
| 800 | if (verbosityLevel() > 0) { |
| 801 | envir() << *this << ": received RTCP \"BYE\". (The back-end stream has ended.)\n" ; |
| 802 | } |
| 803 | |
| 804 | // This "BYE" signals that our input source has (effectively) closed, so pass this onto the front-end clients: |
| 805 | fHaveSetupStream = False; // hack to stop "PAUSE" getting sent by: |
| 806 | if (fClientMediaSubsession.readSource() != NULL) { |
| 807 | fClientMediaSubsession.readSource()->handleClosure(); |
| 808 | } |
| 809 | |
| 810 | // And then treat this as if we had lost connection to the back-end server, |
| 811 | // and can reestablish streaming from it only by sending another "DESCRIBE": |
| 812 | ProxyServerMediaSession* const sms = (ProxyServerMediaSession*)fParentSession; |
| 813 | ProxyRTSPClient* const proxyRTSPClient = sms->fProxyRTSPClient; |
| 814 | proxyRTSPClient->scheduleReset(); |
| 815 | } |
| 816 | |
| 817 | |
| 818 | ////////// PresentationTimeSessionNormalizer and PresentationTimeSubsessionNormalizer implementations ////////// |
| 819 | |
| 820 | // PresentationTimeSessionNormalizer: |
| 821 | |
| 822 | PresentationTimeSessionNormalizer::PresentationTimeSessionNormalizer(UsageEnvironment& env) |
| 823 | : Medium(env), |
| 824 | fSubsessionNormalizers(NULL), fMasterSSNormalizer(NULL) { |
| 825 | } |
| 826 | |
| 827 | PresentationTimeSessionNormalizer::~PresentationTimeSessionNormalizer() { |
| 828 | while (fSubsessionNormalizers != NULL) { |
| 829 | Medium::close(fSubsessionNormalizers); |
| 830 | } |
| 831 | } |
| 832 | |
| 833 | PresentationTimeSubsessionNormalizer* PresentationTimeSessionNormalizer |
| 834 | ::createNewPresentationTimeSubsessionNormalizer(FramedSource* inputSource, RTPSource* rtpSource, |
| 835 | char const* codecName) { |
| 836 | fSubsessionNormalizers |
| 837 | = new PresentationTimeSubsessionNormalizer(*this, inputSource, rtpSource, codecName, fSubsessionNormalizers); |
| 838 | return fSubsessionNormalizers; |
| 839 | } |
| 840 | |
| 841 | void PresentationTimeSessionNormalizer |
| 842 | ::normalizePresentationTime(PresentationTimeSubsessionNormalizer* ssNormalizer, |
| 843 | struct timeval& toPT, struct timeval const& fromPT) { |
| 844 | Boolean const hasBeenSynced = ssNormalizer->fRTPSource->hasBeenSynchronizedUsingRTCP(); |
| 845 | |
| 846 | if (!hasBeenSynced) { |
| 847 | // If "fromPT" has not yet been RTCP-synchronized, then it was generated by our own receiving code, and thus |
| 848 | // is already aligned with 'wall-clock' time. Just copy it 'as is' to "toPT": |
| 849 | toPT = fromPT; |
| 850 | } else { |
| 851 | if (fMasterSSNormalizer == NULL) { |
| 852 | // Make "ssNormalizer" the 'master' subsession - meaning that its presentation time is adjusted to align with 'wall clock' |
| 853 | // time, and the presentation times of other subsessions (if any) are adjusted to retain their relative separation with |
| 854 | // those of the master: |
| 855 | fMasterSSNormalizer = ssNormalizer; |
| 856 | |
| 857 | struct timeval timeNow; |
| 858 | gettimeofday(&timeNow, NULL); |
| 859 | |
| 860 | // Compute: fPTAdjustment = timeNow - fromPT |
| 861 | fPTAdjustment.tv_sec = timeNow.tv_sec - fromPT.tv_sec; |
| 862 | fPTAdjustment.tv_usec = timeNow.tv_usec - fromPT.tv_usec; |
| 863 | // Note: It's OK if one or both of these fields underflows; the result still works out OK later. |
| 864 | } |
| 865 | |
| 866 | // Compute a normalized presentation time: toPT = fromPT + fPTAdjustment |
| 867 | toPT.tv_sec = fromPT.tv_sec + fPTAdjustment.tv_sec - 1; |
| 868 | toPT.tv_usec = fromPT.tv_usec + fPTAdjustment.tv_usec + MILLION; |
| 869 | while (toPT.tv_usec > MILLION) { ++toPT.tv_sec; toPT.tv_usec -= MILLION; } |
| 870 | |
| 871 | // Because "ssNormalizer"s relayed presentation times are accurate from now on, enable RTCP "SR" reports for its "RTPSink": |
| 872 | RTPSink* const rtpSink = ssNormalizer->fRTPSink; |
| 873 | if (rtpSink != NULL) { // sanity check; should always be true |
| 874 | rtpSink->enableRTCPReports() = True; |
| 875 | } |
| 876 | } |
| 877 | } |
| 878 | |
| 879 | void PresentationTimeSessionNormalizer |
| 880 | ::removePresentationTimeSubsessionNormalizer(PresentationTimeSubsessionNormalizer* ssNormalizer) { |
| 881 | // Unlink "ssNormalizer" from the linked list (starting with "fSubsessionNormalizers"): |
| 882 | if (fSubsessionNormalizers == ssNormalizer) { |
| 883 | fSubsessionNormalizers = fSubsessionNormalizers->fNext; |
| 884 | } else { |
| 885 | PresentationTimeSubsessionNormalizer** ssPtrPtr = &(fSubsessionNormalizers->fNext); |
| 886 | while (*ssPtrPtr != ssNormalizer) ssPtrPtr = &((*ssPtrPtr)->fNext); |
| 887 | *ssPtrPtr = (*ssPtrPtr)->fNext; |
| 888 | } |
| 889 | } |
| 890 | |
| 891 | // PresentationTimeSubsessionNormalizer: |
| 892 | |
| 893 | PresentationTimeSubsessionNormalizer |
| 894 | ::PresentationTimeSubsessionNormalizer(PresentationTimeSessionNormalizer& parent, FramedSource* inputSource, RTPSource* rtpSource, |
| 895 | char const* codecName, PresentationTimeSubsessionNormalizer* next) |
| 896 | : FramedFilter(parent.envir(), inputSource), |
| 897 | fParent(parent), fRTPSource(rtpSource), fRTPSink(NULL), fCodecName(codecName), fNext(next) { |
| 898 | } |
| 899 | |
| 900 | PresentationTimeSubsessionNormalizer::~PresentationTimeSubsessionNormalizer() { |
| 901 | fParent.removePresentationTimeSubsessionNormalizer(this); |
| 902 | } |
| 903 | |
| 904 | void PresentationTimeSubsessionNormalizer::afterGettingFrame(void* clientData, unsigned frameSize, |
| 905 | unsigned numTruncatedBytes, |
| 906 | struct timeval presentationTime, |
| 907 | unsigned durationInMicroseconds) { |
| 908 | ((PresentationTimeSubsessionNormalizer*)clientData) |
| 909 | ->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime, durationInMicroseconds); |
| 910 | } |
| 911 | |
| 912 | void PresentationTimeSubsessionNormalizer::afterGettingFrame(unsigned frameSize, |
| 913 | unsigned numTruncatedBytes, |
| 914 | struct timeval presentationTime, |
| 915 | unsigned durationInMicroseconds) { |
| 916 | // This filter is implemented by passing all frames through unchanged, except that "fPresentationTime" is changed: |
| 917 | fFrameSize = frameSize; |
| 918 | fNumTruncatedBytes = numTruncatedBytes; |
| 919 | fDurationInMicroseconds = durationInMicroseconds; |
| 920 | |
| 921 | fParent.normalizePresentationTime(this, fPresentationTime, presentationTime); |
| 922 | |
| 923 | // Hack for JPEG/RTP proxying. Because we're proxying JPEG by just copying the raw JPEG/RTP payloads, without interpreting them, |
| 924 | // we need to also 'copy' the RTP 'M' (marker) bit from the "RTPSource" to the "RTPSink": |
| 925 | if (fRTPSource->curPacketMarkerBit() && strcmp(fCodecName, "JPEG" ) == 0) ((SimpleRTPSink*)fRTPSink)->setMBitOnNextPacket(); |
| 926 | |
| 927 | // Complete delivery: |
| 928 | FramedSource::afterGetting(this); |
| 929 | } |
| 930 | |
| 931 | void PresentationTimeSubsessionNormalizer::doGetNextFrame() { |
| 932 | fInputSource->getNextFrame(fTo, fMaxSize, afterGettingFrame, this, FramedSource::handleClosure, this); |
| 933 | } |
| 934 | |