| 1 | /********** | 
|---|
| 2 | This library is free software; you can redistribute it and/or modify it under | 
|---|
| 3 | the terms of the GNU Lesser General Public License as published by the | 
|---|
| 4 | Free Software Foundation; either version 3 of the License, or (at your | 
|---|
| 5 | option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.) | 
|---|
| 6 |  | 
|---|
| 7 | This library is distributed in the hope that it will be useful, but WITHOUT | 
|---|
| 8 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | 
|---|
| 9 | FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for | 
|---|
| 10 | more details. | 
|---|
| 11 |  | 
|---|
| 12 | You should have received a copy of the GNU Lesser General Public License | 
|---|
| 13 | along with this library; if not, write to the Free Software Foundation, Inc., | 
|---|
| 14 | 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA | 
|---|
| 15 | **********/ | 
|---|
| 16 | // "liveMedia" | 
|---|
| 17 | // Copyright (c) 1996-2020 Live Networks, Inc.  All rights reserved. | 
|---|
| 18 | // An abstraction of a network interface used for RTP (or RTCP). | 
|---|
| 19 | // (This allows the RTP-over-TCP hack (RFC 2326, section 10.12) to | 
|---|
| 20 | // be implemented transparently.) | 
|---|
| 21 | // Implementation | 
|---|
| 22 |  | 
|---|
| 23 | #include "RTPInterface.hh" | 
|---|
| 24 | #include <GroupsockHelper.hh> | 
|---|
| 25 | #include <stdio.h> | 
|---|
| 26 |  | 
|---|
| 27 | ////////// Helper Functions - Definition ////////// | 
|---|
| 28 |  | 
|---|
| 29 | // Helper routines and data structures, used to implement | 
|---|
| 30 | // sending/receiving RTP/RTCP over a TCP socket: | 
|---|
| 31 |  | 
|---|
| 32 | // Reading RTP-over-TCP is implemented using two levels of hash tables. | 
|---|
| 33 | // The top-level hash table maps TCP socket numbers to a | 
|---|
| 34 | // "SocketDescriptor" that contains a hash table for each of the | 
|---|
| 35 | // sub-channels that are reading from this socket. | 
|---|
| 36 |  | 
|---|
| 37 | static HashTable* socketHashTable(UsageEnvironment& env, Boolean createIfNotPresent = True) { | 
|---|
| 38 | _Tables* ourTables = _Tables::getOurTables(env, createIfNotPresent); | 
|---|
| 39 | if (ourTables == NULL) return NULL; | 
|---|
| 40 |  | 
|---|
| 41 | if (ourTables->socketTable == NULL) { | 
|---|
| 42 | // Create a new socket number -> SocketDescriptor mapping table: | 
|---|
| 43 | ourTables->socketTable = HashTable::create(ONE_WORD_HASH_KEYS); | 
|---|
| 44 | } | 
|---|
| 45 | return (HashTable*)(ourTables->socketTable); | 
|---|
| 46 | } | 
|---|
| 47 |  | 
|---|
| 48 | class SocketDescriptor { | 
|---|
| 49 | public: | 
|---|
| 50 | SocketDescriptor(UsageEnvironment& env, int socketNum); | 
|---|
| 51 | virtual ~SocketDescriptor(); | 
|---|
| 52 |  | 
|---|
| 53 | void registerRTPInterface(unsigned char streamChannelId, | 
|---|
| 54 | RTPInterface* rtpInterface); | 
|---|
| 55 | RTPInterface* lookupRTPInterface(unsigned char streamChannelId); | 
|---|
| 56 | void deregisterRTPInterface(unsigned char streamChannelId); | 
|---|
| 57 |  | 
|---|
| 58 | void setServerRequestAlternativeByteHandler(ServerRequestAlternativeByteHandler* handler, void* clientData) { | 
|---|
| 59 | fServerRequestAlternativeByteHandler = handler; | 
|---|
| 60 | fServerRequestAlternativeByteHandlerClientData = clientData; | 
|---|
| 61 | } | 
|---|
| 62 |  | 
|---|
| 63 | private: | 
|---|
| 64 | static void tcpReadHandler(SocketDescriptor*, int mask); | 
|---|
| 65 | Boolean tcpReadHandler1(int mask); | 
|---|
| 66 |  | 
|---|
| 67 | private: | 
|---|
| 68 | UsageEnvironment& fEnv; | 
|---|
| 69 | int fOurSocketNum; | 
|---|
| 70 | HashTable* fSubChannelHashTable; | 
|---|
| 71 | ServerRequestAlternativeByteHandler* fServerRequestAlternativeByteHandler; | 
|---|
| 72 | void* fServerRequestAlternativeByteHandlerClientData; | 
|---|
| 73 | u_int8_t fStreamChannelId, fSizeByte1; | 
|---|
| 74 | Boolean fReadErrorOccurred, fDeleteMyselfNext, fAreInReadHandlerLoop; | 
|---|
| 75 | enum { AWAITING_DOLLAR, AWAITING_STREAM_CHANNEL_ID, AWAITING_SIZE1, AWAITING_SIZE2, AWAITING_PACKET_DATA } fTCPReadingState; | 
|---|
| 76 | }; | 
|---|
| 77 |  | 
|---|
| 78 | static SocketDescriptor* lookupSocketDescriptor(UsageEnvironment& env, int sockNum, Boolean createIfNotFound = True) { | 
|---|
| 79 | HashTable* table = socketHashTable(env, createIfNotFound); | 
|---|
| 80 | if (table == NULL) return NULL; | 
|---|
| 81 |  | 
|---|
| 82 | char const* key = (char const*)(long)sockNum; | 
|---|
| 83 | SocketDescriptor* socketDescriptor = (SocketDescriptor*)(table->Lookup(key)); | 
|---|
| 84 | if (socketDescriptor == NULL) { | 
|---|
| 85 | if (createIfNotFound) { | 
|---|
| 86 | socketDescriptor = new SocketDescriptor(env, sockNum); | 
|---|
| 87 | table->Add((char const*)(long)(sockNum), socketDescriptor); | 
|---|
| 88 | } else if (table->IsEmpty()) { | 
|---|
| 89 | // We can also delete the table (to reclaim space): | 
|---|
| 90 | _Tables* ourTables = _Tables::getOurTables(env); | 
|---|
| 91 | delete table; | 
|---|
| 92 | ourTables->socketTable = NULL; | 
|---|
| 93 | ourTables->reclaimIfPossible(); | 
|---|
| 94 | } | 
|---|
| 95 | } | 
|---|
| 96 |  | 
|---|
| 97 | return socketDescriptor; | 
|---|
| 98 | } | 
|---|
| 99 |  | 
|---|
| 100 | static void removeSocketDescription(UsageEnvironment& env, int sockNum) { | 
|---|
| 101 | char const* key = (char const*)(long)sockNum; | 
|---|
| 102 | HashTable* table = socketHashTable(env); | 
|---|
| 103 | table->Remove(key); | 
|---|
| 104 |  | 
|---|
| 105 | if (table->IsEmpty()) { | 
|---|
| 106 | // We can also delete the table (to reclaim space): | 
|---|
| 107 | _Tables* ourTables = _Tables::getOurTables(env); | 
|---|
| 108 | delete table; | 
|---|
| 109 | ourTables->socketTable = NULL; | 
|---|
| 110 | ourTables->reclaimIfPossible(); | 
|---|
| 111 | } | 
|---|
| 112 | } | 
|---|
| 113 |  | 
|---|
| 114 |  | 
|---|
| 115 | ////////// RTPInterface - Implementation ////////// | 
|---|
| 116 |  | 
|---|
| 117 | RTPInterface::RTPInterface(Medium* owner, Groupsock* gs) | 
|---|
| 118 | : fOwner(owner), fGS(gs), | 
|---|
| 119 | fTCPStreams(NULL), | 
|---|
| 120 | fNextTCPReadSize(0), fNextTCPReadStreamSocketNum(-1), | 
|---|
| 121 | fNextTCPReadStreamChannelId(0xFF), fReadHandlerProc(NULL), | 
|---|
| 122 | fAuxReadHandlerFunc(NULL), fAuxReadHandlerClientData(NULL) { | 
|---|
| 123 | // Make the socket non-blocking, even though it will be read from only asynchronously, when packets arrive. | 
|---|
| 124 | // The reason for this is that, in some OSs, reads on a blocking socket can (allegedly) sometimes block, | 
|---|
| 125 | // even if the socket was previously reported (e.g., by "select()") as having data available. | 
|---|
| 126 | // (This can supposedly happen if the UDP checksum fails, for example.) | 
|---|
| 127 | makeSocketNonBlocking(fGS->socketNum()); | 
|---|
| 128 | increaseSendBufferTo(envir(), fGS->socketNum(), 50*1024); | 
|---|
| 129 | } | 
|---|
| 130 |  | 
|---|
| 131 | RTPInterface::~RTPInterface() { | 
|---|
| 132 | stopNetworkReading(); | 
|---|
| 133 | delete fTCPStreams; | 
|---|
| 134 | } | 
|---|
| 135 |  | 
|---|
| 136 | void RTPInterface::setStreamSocket(int sockNum, | 
|---|
| 137 | unsigned char streamChannelId) { | 
|---|
| 138 | fGS->removeAllDestinations(); | 
|---|
| 139 | envir().taskScheduler().disableBackgroundHandling(fGS->socketNum()); // turn off any reading on our datagram socket | 
|---|
| 140 | fGS->reset(); // and close our datagram socket, because we won't be using it anymore | 
|---|
| 141 |  | 
|---|
| 142 | addStreamSocket(sockNum, streamChannelId); | 
|---|
| 143 | } | 
|---|
| 144 |  | 
|---|
| 145 | void RTPInterface::addStreamSocket(int sockNum, | 
|---|
| 146 | unsigned char streamChannelId) { | 
|---|
| 147 | if (sockNum < 0) return; | 
|---|
| 148 |  | 
|---|
| 149 | for (tcpStreamRecord* streams = fTCPStreams; streams != NULL; | 
|---|
| 150 | streams = streams->fNext) { | 
|---|
| 151 | if (streams->fStreamSocketNum == sockNum | 
|---|
| 152 | && streams->fStreamChannelId == streamChannelId) { | 
|---|
| 153 | return; // we already have it | 
|---|
| 154 | } | 
|---|
| 155 | } | 
|---|
| 156 |  | 
|---|
| 157 | fTCPStreams = new tcpStreamRecord(sockNum, streamChannelId, fTCPStreams); | 
|---|
| 158 |  | 
|---|
| 159 | // Also, make sure this new socket is set up for receiving RTP/RTCP-over-TCP: | 
|---|
| 160 | SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), sockNum); | 
|---|
| 161 | socketDescriptor->registerRTPInterface(streamChannelId, this); | 
|---|
| 162 | } | 
|---|
| 163 |  | 
|---|
| 164 | static void deregisterSocket(UsageEnvironment& env, int sockNum, unsigned char streamChannelId) { | 
|---|
| 165 | SocketDescriptor* socketDescriptor = lookupSocketDescriptor(env, sockNum, False); | 
|---|
| 166 | if (socketDescriptor != NULL) { | 
|---|
| 167 | socketDescriptor->deregisterRTPInterface(streamChannelId); | 
|---|
| 168 | // Note: This may delete "socketDescriptor", | 
|---|
| 169 | // if no more interfaces are using this socket | 
|---|
| 170 | } | 
|---|
| 171 | } | 
|---|
| 172 |  | 
|---|
| 173 | void RTPInterface::removeStreamSocket(int sockNum, | 
|---|
| 174 | unsigned char streamChannelId) { | 
|---|
| 175 | // Remove - from our list of 'TCP streams' - the record of the (sockNum,streamChannelId) pair. | 
|---|
| 176 | // (However "streamChannelId" == 0xFF is a special case, meaning remove all | 
|---|
| 177 | //  (sockNum,*) pairs.) | 
|---|
| 178 |  | 
|---|
| 179 | while (1) { | 
|---|
| 180 | tcpStreamRecord** streamsPtr = &fTCPStreams; | 
|---|
| 181 |  | 
|---|
| 182 | while (*streamsPtr != NULL) { | 
|---|
| 183 | if ((*streamsPtr)->fStreamSocketNum == sockNum | 
|---|
| 184 | && (streamChannelId == 0xFF || streamChannelId == (*streamsPtr)->fStreamChannelId)) { | 
|---|
| 185 | // Delete the record pointed to by *streamsPtr : | 
|---|
| 186 | unsigned char streamChannelIdToRemove = (*streamsPtr)->fStreamChannelId; | 
|---|
| 187 | tcpStreamRecord* next = (*streamsPtr)->fNext; | 
|---|
| 188 | (*streamsPtr)->fNext = NULL; | 
|---|
| 189 | delete (*streamsPtr); | 
|---|
| 190 | *streamsPtr = next; | 
|---|
| 191 |  | 
|---|
| 192 | // And 'deregister' this socket,channelId pair: | 
|---|
| 193 | deregisterSocket(envir(), sockNum, streamChannelIdToRemove); | 
|---|
| 194 |  | 
|---|
| 195 | if (streamChannelId != 0xFF) return; // we're done | 
|---|
| 196 | break; // start again from the beginning of the list, in case the list has changed | 
|---|
| 197 | } else { | 
|---|
| 198 | streamsPtr = &((*streamsPtr)->fNext); | 
|---|
| 199 | } | 
|---|
| 200 | } | 
|---|
| 201 | if (*streamsPtr == NULL) break; | 
|---|
| 202 | } | 
|---|
| 203 | } | 
|---|
| 204 |  | 
|---|
| 205 | void RTPInterface::setServerRequestAlternativeByteHandler(UsageEnvironment& env, int socketNum, | 
|---|
| 206 | ServerRequestAlternativeByteHandler* handler, void* clientData) { | 
|---|
| 207 | SocketDescriptor* socketDescriptor = lookupSocketDescriptor(env, socketNum, False); | 
|---|
| 208 |  | 
|---|
| 209 | if (socketDescriptor != NULL) socketDescriptor->setServerRequestAlternativeByteHandler(handler, clientData); | 
|---|
| 210 | } | 
|---|
| 211 |  | 
|---|
| 212 | void RTPInterface::clearServerRequestAlternativeByteHandler(UsageEnvironment& env, int socketNum) { | 
|---|
| 213 | setServerRequestAlternativeByteHandler(env, socketNum, NULL, NULL); | 
|---|
| 214 | } | 
|---|
| 215 |  | 
|---|
| 216 | Boolean RTPInterface::sendPacket(unsigned char* packet, unsigned packetSize) { | 
|---|
| 217 | Boolean success = True; // we'll return False instead if any of the sends fail | 
|---|
| 218 |  | 
|---|
| 219 | // Normal case: Send as a UDP packet: | 
|---|
| 220 | if (!fGS->output(envir(), packet, packetSize)) success = False; | 
|---|
| 221 |  | 
|---|
| 222 | // Also, send over each of our TCP sockets: | 
|---|
| 223 | tcpStreamRecord* nextStream; | 
|---|
| 224 | for (tcpStreamRecord* stream = fTCPStreams; stream != NULL; stream = nextStream) { | 
|---|
| 225 | nextStream = stream->fNext; // Set this now, in case the following deletes "stream": | 
|---|
| 226 | if (!sendRTPorRTCPPacketOverTCP(packet, packetSize, | 
|---|
| 227 | stream->fStreamSocketNum, stream->fStreamChannelId)) { | 
|---|
| 228 | success = False; | 
|---|
| 229 | } | 
|---|
| 230 | } | 
|---|
| 231 |  | 
|---|
| 232 | return success; | 
|---|
| 233 | } | 
|---|
| 234 |  | 
|---|
| 235 | void RTPInterface | 
|---|
| 236 | ::startNetworkReading(TaskScheduler::BackgroundHandlerProc* handlerProc) { | 
|---|
| 237 | // Normal case: Arrange to read UDP packets: | 
|---|
| 238 | envir().taskScheduler(). | 
|---|
| 239 | turnOnBackgroundReadHandling(fGS->socketNum(), handlerProc, fOwner); | 
|---|
| 240 |  | 
|---|
| 241 | // Also, receive RTP over TCP, on each of our TCP connections: | 
|---|
| 242 | fReadHandlerProc = handlerProc; | 
|---|
| 243 | for (tcpStreamRecord* streams = fTCPStreams; streams != NULL; | 
|---|
| 244 | streams = streams->fNext) { | 
|---|
| 245 | // Get a socket descriptor for "streams->fStreamSocketNum": | 
|---|
| 246 | SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), streams->fStreamSocketNum); | 
|---|
| 247 |  | 
|---|
| 248 | // Tell it about our subChannel: | 
|---|
| 249 | socketDescriptor->registerRTPInterface(streams->fStreamChannelId, this); | 
|---|
| 250 | } | 
|---|
| 251 | } | 
|---|
| 252 |  | 
|---|
| 253 | Boolean RTPInterface::handleRead(unsigned char* buffer, unsigned bufferMaxSize, | 
|---|
| 254 | unsigned& bytesRead, struct sockaddr_in& fromAddress, | 
|---|
| 255 | int& tcpSocketNum, unsigned char& tcpStreamChannelId, | 
|---|
| 256 | Boolean& packetReadWasIncomplete) { | 
|---|
| 257 | packetReadWasIncomplete = False; // by default | 
|---|
| 258 | Boolean readSuccess; | 
|---|
| 259 | if (fNextTCPReadStreamSocketNum < 0) { | 
|---|
| 260 | // Normal case: read from the (datagram) 'groupsock': | 
|---|
| 261 | tcpSocketNum = -1; | 
|---|
| 262 | readSuccess = fGS->handleRead(buffer, bufferMaxSize, bytesRead, fromAddress); | 
|---|
| 263 | } else { | 
|---|
| 264 | // Read from the TCP connection: | 
|---|
| 265 | tcpSocketNum = fNextTCPReadStreamSocketNum; | 
|---|
| 266 | tcpStreamChannelId = fNextTCPReadStreamChannelId; | 
|---|
| 267 |  | 
|---|
| 268 | bytesRead = 0; | 
|---|
| 269 | unsigned totBytesToRead = fNextTCPReadSize; | 
|---|
| 270 | if (totBytesToRead > bufferMaxSize) totBytesToRead = bufferMaxSize; | 
|---|
| 271 | unsigned curBytesToRead = totBytesToRead; | 
|---|
| 272 | int curBytesRead; | 
|---|
| 273 | while ((curBytesRead = readSocket(envir(), fNextTCPReadStreamSocketNum, | 
|---|
| 274 | &buffer[bytesRead], curBytesToRead, | 
|---|
| 275 | fromAddress)) > 0) { | 
|---|
| 276 | bytesRead += curBytesRead; | 
|---|
| 277 | if (bytesRead >= totBytesToRead) break; | 
|---|
| 278 | curBytesToRead -= curBytesRead; | 
|---|
| 279 | } | 
|---|
| 280 | fNextTCPReadSize -= bytesRead; | 
|---|
| 281 | if (fNextTCPReadSize == 0) { | 
|---|
| 282 | // We've read all of the data that we asked for | 
|---|
| 283 | readSuccess = True; | 
|---|
| 284 | } else if (curBytesRead < 0) { | 
|---|
| 285 | // There was an error reading the socket | 
|---|
| 286 | bytesRead = 0; | 
|---|
| 287 | readSuccess = False; | 
|---|
| 288 | } else { | 
|---|
| 289 | // We need to read more bytes, and there was not an error reading the socket | 
|---|
| 290 | packetReadWasIncomplete = True; | 
|---|
| 291 | return True; | 
|---|
| 292 | } | 
|---|
| 293 | fNextTCPReadStreamSocketNum = -1; // default, for next time | 
|---|
| 294 | } | 
|---|
| 295 |  | 
|---|
| 296 | if (readSuccess && fAuxReadHandlerFunc != NULL) { | 
|---|
| 297 | // Also pass the newly-read packet data to our auxilliary handler: | 
|---|
| 298 | (*fAuxReadHandlerFunc)(fAuxReadHandlerClientData, buffer, bytesRead); | 
|---|
| 299 | } | 
|---|
| 300 | return readSuccess; | 
|---|
| 301 | } | 
|---|
| 302 |  | 
|---|
| 303 | void RTPInterface::stopNetworkReading() { | 
|---|
| 304 | // Normal case | 
|---|
| 305 | if (fGS != NULL) envir().taskScheduler().turnOffBackgroundReadHandling(fGS->socketNum()); | 
|---|
| 306 |  | 
|---|
| 307 | // Also turn off read handling on each of our TCP connections: | 
|---|
| 308 | for (tcpStreamRecord* streams = fTCPStreams; streams != NULL; streams = streams->fNext) { | 
|---|
| 309 | deregisterSocket(envir(), streams->fStreamSocketNum, streams->fStreamChannelId); | 
|---|
| 310 | } | 
|---|
| 311 | } | 
|---|
| 312 |  | 
|---|
| 313 |  | 
|---|
| 314 | ////////// Helper Functions - Implementation ///////// | 
|---|
| 315 |  | 
|---|
| 316 | Boolean RTPInterface::sendRTPorRTCPPacketOverTCP(u_int8_t* packet, unsigned packetSize, | 
|---|
| 317 | int socketNum, unsigned char streamChannelId) { | 
|---|
| 318 | #ifdef DEBUG_SEND | 
|---|
| 319 | fprintf(stderr, "sendRTPorRTCPPacketOverTCP: %d bytes over channel %d (socket %d)\n", | 
|---|
| 320 | packetSize, streamChannelId, socketNum); fflush(stderr); | 
|---|
| 321 | #endif | 
|---|
| 322 | // Send a RTP/RTCP packet over TCP, using the encoding defined in RFC 2326, section 10.12: | 
|---|
| 323 | //     $<streamChannelId><packetSize><packet> | 
|---|
| 324 | // (If the initial "send()" of '$<streamChannelId><packetSize>' succeeds, then we force | 
|---|
| 325 | // the subsequent "send()" for the <packet> data to succeed, even if we have to do so with | 
|---|
| 326 | // a blocking "send()".) | 
|---|
| 327 | do { | 
|---|
| 328 | u_int8_t [4]; | 
|---|
| 329 | framingHeader[0] = '$'; | 
|---|
| 330 | framingHeader[1] = streamChannelId; | 
|---|
| 331 | framingHeader[2] = (u_int8_t) ((packetSize&0xFF00)>>8); | 
|---|
| 332 | framingHeader[3] = (u_int8_t) (packetSize&0xFF); | 
|---|
| 333 | if (!sendDataOverTCP(socketNum, framingHeader, 4, False)) break; | 
|---|
| 334 |  | 
|---|
| 335 | if (!sendDataOverTCP(socketNum, packet, packetSize, True)) break; | 
|---|
| 336 | #ifdef DEBUG_SEND | 
|---|
| 337 | fprintf(stderr, "sendRTPorRTCPPacketOverTCP: completed\n"); fflush(stderr); | 
|---|
| 338 | #endif | 
|---|
| 339 |  | 
|---|
| 340 | return True; | 
|---|
| 341 | } while (0); | 
|---|
| 342 |  | 
|---|
| 343 | #ifdef DEBUG_SEND | 
|---|
| 344 | fprintf(stderr, "sendRTPorRTCPPacketOverTCP: failed! (errno %d)\n", envir().getErrno()); fflush(stderr); | 
|---|
| 345 | #endif | 
|---|
| 346 | return False; | 
|---|
| 347 | } | 
|---|
| 348 |  | 
|---|
| 349 | #ifndef RTPINTERFACE_BLOCKING_WRITE_TIMEOUT_MS | 
|---|
| 350 | #define RTPINTERFACE_BLOCKING_WRITE_TIMEOUT_MS 500 | 
|---|
| 351 | #endif | 
|---|
| 352 |  | 
|---|
| 353 | Boolean RTPInterface::sendDataOverTCP(int socketNum, u_int8_t const* data, unsigned dataSize, Boolean forceSendToSucceed) { | 
|---|
| 354 | int sendResult = send(socketNum, (char const*)data, dataSize, 0/*flags*/); | 
|---|
| 355 | if (sendResult < (int)dataSize) { | 
|---|
| 356 | // The TCP send() failed - at least partially. | 
|---|
| 357 |  | 
|---|
| 358 | unsigned numBytesSentSoFar = sendResult < 0 ? 0 : (unsigned)sendResult; | 
|---|
| 359 | if (numBytesSentSoFar > 0 || (forceSendToSucceed && envir().getErrno() == EAGAIN)) { | 
|---|
| 360 | // The OS's TCP send buffer has filled up (because the stream's bitrate has exceeded | 
|---|
| 361 | // the capacity of the TCP connection!). | 
|---|
| 362 | // Force this data write to succeed, by blocking if necessary until it does: | 
|---|
| 363 | unsigned numBytesRemainingToSend = dataSize - numBytesSentSoFar; | 
|---|
| 364 | #ifdef DEBUG_SEND | 
|---|
| 365 | fprintf(stderr, "sendDataOverTCP: resending %d-byte send (blocking)\n", numBytesRemainingToSend); fflush(stderr); | 
|---|
| 366 | #endif | 
|---|
| 367 | makeSocketBlocking(socketNum, RTPINTERFACE_BLOCKING_WRITE_TIMEOUT_MS); | 
|---|
| 368 | sendResult = send(socketNum, (char const*)(&data[numBytesSentSoFar]), numBytesRemainingToSend, 0/*flags*/); | 
|---|
| 369 | if ((unsigned)sendResult != numBytesRemainingToSend) { | 
|---|
| 370 | // The blocking "send()" failed, or timed out.  In either case, we assume that the | 
|---|
| 371 | // TCP connection has failed (or is 'hanging' indefinitely), and we stop using it | 
|---|
| 372 | // (for both RTP and RTP). | 
|---|
| 373 | // (If we kept using the socket here, the RTP or RTCP packet write would be in an | 
|---|
| 374 | //  incomplete, inconsistent state.) | 
|---|
| 375 | #ifdef DEBUG_SEND | 
|---|
| 376 | fprintf(stderr, "sendDataOverTCP: blocking send() failed (delivering %d bytes out of %d); closing socket %d\n", sendResult, numBytesRemainingToSend, socketNum); fflush(stderr); | 
|---|
| 377 | #endif | 
|---|
| 378 | removeStreamSocket(socketNum, 0xFF); | 
|---|
| 379 | return False; | 
|---|
| 380 | } | 
|---|
| 381 | makeSocketNonBlocking(socketNum); | 
|---|
| 382 |  | 
|---|
| 383 | return True; | 
|---|
| 384 | } else if (sendResult < 0 && envir().getErrno() != EAGAIN) { | 
|---|
| 385 | // Because the "send()" call failed, assume that the socket is now unusable, so stop | 
|---|
| 386 | // using it (for both RTP and RTCP): | 
|---|
| 387 | removeStreamSocket(socketNum, 0xFF); | 
|---|
| 388 | } | 
|---|
| 389 |  | 
|---|
| 390 | return False; | 
|---|
| 391 | } | 
|---|
| 392 |  | 
|---|
| 393 | return True; | 
|---|
| 394 | } | 
|---|
| 395 |  | 
|---|
| 396 | SocketDescriptor::SocketDescriptor(UsageEnvironment& env, int socketNum) | 
|---|
| 397 | :fEnv(env), fOurSocketNum(socketNum), | 
|---|
| 398 | fSubChannelHashTable(HashTable::create(ONE_WORD_HASH_KEYS)), | 
|---|
| 399 | fServerRequestAlternativeByteHandler(NULL), fServerRequestAlternativeByteHandlerClientData(NULL), | 
|---|
| 400 | fReadErrorOccurred(False), fDeleteMyselfNext(False), fAreInReadHandlerLoop(False), fTCPReadingState(AWAITING_DOLLAR) { | 
|---|
| 401 | } | 
|---|
| 402 |  | 
|---|
| 403 | SocketDescriptor::~SocketDescriptor() { | 
|---|
| 404 | fEnv.taskScheduler().turnOffBackgroundReadHandling(fOurSocketNum); | 
|---|
| 405 | removeSocketDescription(fEnv, fOurSocketNum); | 
|---|
| 406 |  | 
|---|
| 407 | if (fSubChannelHashTable != NULL) { | 
|---|
| 408 | // Remove knowledge of this socket from any "RTPInterface"s that are using it: | 
|---|
| 409 | HashTable::Iterator* iter = HashTable::Iterator::create(*fSubChannelHashTable); | 
|---|
| 410 | RTPInterface* rtpInterface; | 
|---|
| 411 | char const* key; | 
|---|
| 412 |  | 
|---|
| 413 | while ((rtpInterface = (RTPInterface*)(iter->next(key))) != NULL) { | 
|---|
| 414 | u_int64_t streamChannelIdLong = (u_int64_t)key; | 
|---|
| 415 | unsigned char streamChannelId = (unsigned char)streamChannelIdLong; | 
|---|
| 416 |  | 
|---|
| 417 | rtpInterface->removeStreamSocket(fOurSocketNum, streamChannelId); | 
|---|
| 418 | } | 
|---|
| 419 | delete iter; | 
|---|
| 420 |  | 
|---|
| 421 | // Then remove the hash table entries themselves, and then remove the hash table: | 
|---|
| 422 | while (fSubChannelHashTable->RemoveNext() != NULL) {} | 
|---|
| 423 | delete fSubChannelHashTable; | 
|---|
| 424 | } | 
|---|
| 425 |  | 
|---|
| 426 | // Finally: | 
|---|
| 427 | if (fServerRequestAlternativeByteHandler != NULL) { | 
|---|
| 428 | // Hack: Pass a special character to our alternative byte handler, to tell it that either | 
|---|
| 429 | // - an error occurred when reading the TCP socket, or | 
|---|
| 430 | // - no error occurred, but it needs to take over control of the TCP socket once again. | 
|---|
| 431 | u_int8_t specialChar = fReadErrorOccurred ? 0xFF : 0xFE; | 
|---|
| 432 | (*fServerRequestAlternativeByteHandler)(fServerRequestAlternativeByteHandlerClientData, specialChar); | 
|---|
| 433 | } | 
|---|
| 434 | } | 
|---|
| 435 |  | 
|---|
| 436 | void SocketDescriptor::registerRTPInterface(unsigned char streamChannelId, | 
|---|
| 437 | RTPInterface* rtpInterface) { | 
|---|
| 438 | Boolean isFirstRegistration = fSubChannelHashTable->IsEmpty(); | 
|---|
| 439 | #if defined(DEBUG_SEND)||defined(DEBUG_RECEIVE) | 
|---|
| 440 | fprintf(stderr, "SocketDescriptor(socket %d)::registerRTPInterface(channel %d): isFirstRegistration %d\n", fOurSocketNum, streamChannelId, isFirstRegistration); | 
|---|
| 441 | #endif | 
|---|
| 442 | fSubChannelHashTable->Add((char const*)(long)streamChannelId, | 
|---|
| 443 | rtpInterface); | 
|---|
| 444 |  | 
|---|
| 445 | if (isFirstRegistration) { | 
|---|
| 446 | // Arrange to handle reads on this TCP socket: | 
|---|
| 447 | TaskScheduler::BackgroundHandlerProc* handler | 
|---|
| 448 | = (TaskScheduler::BackgroundHandlerProc*)&tcpReadHandler; | 
|---|
| 449 | fEnv.taskScheduler(). | 
|---|
| 450 | setBackgroundHandling(fOurSocketNum, SOCKET_READABLE|SOCKET_EXCEPTION, handler, this); | 
|---|
| 451 | } | 
|---|
| 452 | } | 
|---|
| 453 |  | 
|---|
| 454 | RTPInterface* SocketDescriptor | 
|---|
| 455 | ::lookupRTPInterface(unsigned char streamChannelId) { | 
|---|
| 456 | char const* lookupArg = (char const*)(long)streamChannelId; | 
|---|
| 457 | return (RTPInterface*)(fSubChannelHashTable->Lookup(lookupArg)); | 
|---|
| 458 | } | 
|---|
| 459 |  | 
|---|
| 460 | void SocketDescriptor | 
|---|
| 461 | ::deregisterRTPInterface(unsigned char streamChannelId) { | 
|---|
| 462 | #if defined(DEBUG_SEND)||defined(DEBUG_RECEIVE) | 
|---|
| 463 | fprintf(stderr, "SocketDescriptor(socket %d)::deregisterRTPInterface(channel %d)\n", fOurSocketNum, streamChannelId); | 
|---|
| 464 | #endif | 
|---|
| 465 | fSubChannelHashTable->Remove((char const*)(long)streamChannelId); | 
|---|
| 466 |  | 
|---|
| 467 | if (fSubChannelHashTable->IsEmpty()) { | 
|---|
| 468 | // No more interfaces are using us, so it's curtains for us now: | 
|---|
| 469 | if (fAreInReadHandlerLoop) { | 
|---|
| 470 | fDeleteMyselfNext = True; // we can't delete ourself yet, but we'll do so from "tcpReadHandler()" below | 
|---|
| 471 | } else { | 
|---|
| 472 | delete this; | 
|---|
| 473 | } | 
|---|
| 474 | } | 
|---|
| 475 | } | 
|---|
| 476 |  | 
|---|
| 477 | void SocketDescriptor::tcpReadHandler(SocketDescriptor* socketDescriptor, int mask) { | 
|---|
| 478 | // Call the read handler until it returns false, with a limit to avoid starving other sockets | 
|---|
| 479 | unsigned count = 2000; | 
|---|
| 480 | socketDescriptor->fAreInReadHandlerLoop = True; | 
|---|
| 481 | while (!socketDescriptor->fDeleteMyselfNext && socketDescriptor->tcpReadHandler1(mask) && --count > 0) {} | 
|---|
| 482 | socketDescriptor->fAreInReadHandlerLoop = False; | 
|---|
| 483 | if (socketDescriptor->fDeleteMyselfNext) delete socketDescriptor; | 
|---|
| 484 | } | 
|---|
| 485 |  | 
|---|
| 486 | Boolean SocketDescriptor::tcpReadHandler1(int mask) { | 
|---|
| 487 | // We expect the following data over the TCP channel: | 
|---|
| 488 | //   optional RTSP command or response bytes (before the first '$' character) | 
|---|
| 489 | //   a '$' character | 
|---|
| 490 | //   a 1-byte channel id | 
|---|
| 491 | //   a 2-byte packet size (in network byte order) | 
|---|
| 492 | //   the packet data. | 
|---|
| 493 | // However, because the socket is being read asynchronously, this data might arrive in pieces. | 
|---|
| 494 |  | 
|---|
| 495 | u_int8_t c; | 
|---|
| 496 | struct sockaddr_in fromAddress; | 
|---|
| 497 | if (fTCPReadingState != AWAITING_PACKET_DATA) { | 
|---|
| 498 | int result = readSocket(fEnv, fOurSocketNum, &c, 1, fromAddress); | 
|---|
| 499 | if (result == 0) { // There was no more data to read | 
|---|
| 500 | return False; | 
|---|
| 501 | } else if (result != 1) { // error reading TCP socket, so we will no longer handle it | 
|---|
| 502 | #ifdef DEBUG_RECEIVE | 
|---|
| 503 | fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): readSocket(1 byte) returned %d (error)\n", fOurSocketNum, result); | 
|---|
| 504 | #endif | 
|---|
| 505 | fReadErrorOccurred = True; | 
|---|
| 506 | fDeleteMyselfNext = True; | 
|---|
| 507 | return False; | 
|---|
| 508 | } | 
|---|
| 509 | } | 
|---|
| 510 |  | 
|---|
| 511 | Boolean callAgain = True; | 
|---|
| 512 | switch (fTCPReadingState) { | 
|---|
| 513 | case AWAITING_DOLLAR: { | 
|---|
| 514 | if (c == '$') { | 
|---|
| 515 | #ifdef DEBUG_RECEIVE | 
|---|
| 516 | fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): Saw '$'\n", fOurSocketNum); | 
|---|
| 517 | #endif | 
|---|
| 518 | fTCPReadingState = AWAITING_STREAM_CHANNEL_ID; | 
|---|
| 519 | } else { | 
|---|
| 520 | // This character is part of a RTSP request or command, which is handled separately: | 
|---|
| 521 | if (fServerRequestAlternativeByteHandler != NULL && c != 0xFF && c != 0xFE) { | 
|---|
| 522 | // Hack: 0xFF and 0xFE are used as special signaling characters, so don't send them | 
|---|
| 523 | (*fServerRequestAlternativeByteHandler)(fServerRequestAlternativeByteHandlerClientData, c); | 
|---|
| 524 | } | 
|---|
| 525 | } | 
|---|
| 526 | break; | 
|---|
| 527 | } | 
|---|
| 528 | case AWAITING_STREAM_CHANNEL_ID: { | 
|---|
| 529 | // The byte that we read is the stream channel id. | 
|---|
| 530 | if (lookupRTPInterface(c) != NULL) { // sanity check | 
|---|
| 531 | fStreamChannelId = c; | 
|---|
| 532 | fTCPReadingState = AWAITING_SIZE1; | 
|---|
| 533 | } else { | 
|---|
| 534 | // This wasn't a stream channel id that we expected.  We're (somehow) in a strange state.  Try to recover: | 
|---|
| 535 | #ifdef DEBUG_RECEIVE | 
|---|
| 536 | fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): Saw nonexistent stream channel id: 0x%02x\n", fOurSocketNum, c); | 
|---|
| 537 | #endif | 
|---|
| 538 | fTCPReadingState = AWAITING_DOLLAR; | 
|---|
| 539 | } | 
|---|
| 540 | break; | 
|---|
| 541 | } | 
|---|
| 542 | case AWAITING_SIZE1: { | 
|---|
| 543 | // The byte that we read is the first (high) byte of the 16-bit RTP or RTCP packet 'size'. | 
|---|
| 544 | fSizeByte1 = c; | 
|---|
| 545 | fTCPReadingState = AWAITING_SIZE2; | 
|---|
| 546 | break; | 
|---|
| 547 | } | 
|---|
| 548 | case AWAITING_SIZE2: { | 
|---|
| 549 | // The byte that we read is the second (low) byte of the 16-bit RTP or RTCP packet 'size'. | 
|---|
| 550 | unsigned short size = (fSizeByte1<<8)|c; | 
|---|
| 551 |  | 
|---|
| 552 | // Record the information about the packet data that will be read next: | 
|---|
| 553 | RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId); | 
|---|
| 554 | if (rtpInterface != NULL) { | 
|---|
| 555 | rtpInterface->fNextTCPReadSize = size; | 
|---|
| 556 | rtpInterface->fNextTCPReadStreamSocketNum = fOurSocketNum; | 
|---|
| 557 | rtpInterface->fNextTCPReadStreamChannelId = fStreamChannelId; | 
|---|
| 558 | } | 
|---|
| 559 | fTCPReadingState = AWAITING_PACKET_DATA; | 
|---|
| 560 | break; | 
|---|
| 561 | } | 
|---|
| 562 | case AWAITING_PACKET_DATA: { | 
|---|
| 563 | callAgain = False; | 
|---|
| 564 | fTCPReadingState = AWAITING_DOLLAR; // the next state, unless we end up having to read more data in the current state | 
|---|
| 565 | // Call the appropriate read handler to get the packet data from the TCP stream: | 
|---|
| 566 | RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId); | 
|---|
| 567 | if (rtpInterface != NULL) { | 
|---|
| 568 | if (rtpInterface->fNextTCPReadSize == 0) { | 
|---|
| 569 | // We've already read all the data for this packet. | 
|---|
| 570 | break; | 
|---|
| 571 | } | 
|---|
| 572 | if (rtpInterface->fReadHandlerProc != NULL) { | 
|---|
| 573 | #ifdef DEBUG_RECEIVE | 
|---|
| 574 | fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): reading %d bytes on channel %d\n", fOurSocketNum, rtpInterface->fNextTCPReadSize, rtpInterface->fNextTCPReadStreamChannelId); | 
|---|
| 575 | #endif | 
|---|
| 576 | fTCPReadingState = AWAITING_PACKET_DATA; | 
|---|
| 577 | rtpInterface->fReadHandlerProc(rtpInterface->fOwner, mask); | 
|---|
| 578 | } else { | 
|---|
| 579 | #ifdef DEBUG_RECEIVE | 
|---|
| 580 | fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): No handler proc for \"rtpInterface\" for channel %d; need to skip %d remaining bytes\n", fOurSocketNum, fStreamChannelId, rtpInterface->fNextTCPReadSize); | 
|---|
| 581 | #endif | 
|---|
| 582 | int result = readSocket(fEnv, fOurSocketNum, &c, 1, fromAddress); | 
|---|
| 583 | if (result < 0) { // error reading TCP socket, so we will no longer handle it | 
|---|
| 584 | #ifdef DEBUG_RECEIVE | 
|---|
| 585 | fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): readSocket(1 byte) returned %d (error)\n", fOurSocketNum, result); | 
|---|
| 586 | #endif | 
|---|
| 587 | fReadErrorOccurred = True; | 
|---|
| 588 | fDeleteMyselfNext = True; | 
|---|
| 589 | return False; | 
|---|
| 590 | } else { | 
|---|
| 591 | fTCPReadingState = AWAITING_PACKET_DATA; | 
|---|
| 592 | if (result == 1) { | 
|---|
| 593 | --rtpInterface->fNextTCPReadSize; | 
|---|
| 594 | callAgain = True; | 
|---|
| 595 | } | 
|---|
| 596 | } | 
|---|
| 597 | } | 
|---|
| 598 | } | 
|---|
| 599 | #ifdef DEBUG_RECEIVE | 
|---|
| 600 | else fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): No \"rtpInterface\" for channel %d\n", fOurSocketNum, fStreamChannelId); | 
|---|
| 601 | #endif | 
|---|
| 602 | } | 
|---|
| 603 | } | 
|---|
| 604 |  | 
|---|
| 605 | return callAgain; | 
|---|
| 606 | } | 
|---|
| 607 |  | 
|---|
| 608 |  | 
|---|
| 609 | ////////// tcpStreamRecord implementation ////////// | 
|---|
| 610 |  | 
|---|
| 611 | tcpStreamRecord | 
|---|
| 612 | ::tcpStreamRecord(int streamSocketNum, unsigned char streamChannelId, | 
|---|
| 613 | tcpStreamRecord* next) | 
|---|
| 614 | : fNext(next), | 
|---|
| 615 | fStreamSocketNum(streamSocketNum), fStreamChannelId(streamChannelId) { | 
|---|
| 616 | } | 
|---|
| 617 |  | 
|---|
| 618 | tcpStreamRecord::~tcpStreamRecord() { | 
|---|
| 619 | delete fNext; | 
|---|
| 620 | } | 
|---|
| 621 |  | 
|---|