1/**********
2This library is free software; you can redistribute it and/or modify it under
3the terms of the GNU Lesser General Public License as published by the
4Free Software Foundation; either version 3 of the License, or (at your
5option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
6
7This library is distributed in the hope that it will be useful, but WITHOUT
8ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
9FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
10more details.
11
12You should have received a copy of the GNU Lesser General Public License
13along with this library; if not, write to the Free Software Foundation, Inc.,
1451 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15**********/
16// "liveMedia"
17// Copyright (c) 1996-2020 Live Networks, Inc. All rights reserved.
18// An abstraction of a network interface used for RTP (or RTCP).
19// (This allows the RTP-over-TCP hack (RFC 2326, section 10.12) to
20// be implemented transparently.)
21// Implementation
22
23#include "RTPInterface.hh"
24#include <GroupsockHelper.hh>
25#include <stdio.h>
26
27////////// Helper Functions - Definition //////////
28
29// Helper routines and data structures, used to implement
30// sending/receiving RTP/RTCP over a TCP socket:
31
32// Reading RTP-over-TCP is implemented using two levels of hash tables.
33// The top-level hash table maps TCP socket numbers to a
34// "SocketDescriptor" that contains a hash table for each of the
35// sub-channels that are reading from this socket.
36
37static HashTable* socketHashTable(UsageEnvironment& env, Boolean createIfNotPresent = True) {
38 _Tables* ourTables = _Tables::getOurTables(env, createIfNotPresent);
39 if (ourTables == NULL) return NULL;
40
41 if (ourTables->socketTable == NULL) {
42 // Create a new socket number -> SocketDescriptor mapping table:
43 ourTables->socketTable = HashTable::create(ONE_WORD_HASH_KEYS);
44 }
45 return (HashTable*)(ourTables->socketTable);
46}
47
48class SocketDescriptor {
49public:
50 SocketDescriptor(UsageEnvironment& env, int socketNum);
51 virtual ~SocketDescriptor();
52
53 void registerRTPInterface(unsigned char streamChannelId,
54 RTPInterface* rtpInterface);
55 RTPInterface* lookupRTPInterface(unsigned char streamChannelId);
56 void deregisterRTPInterface(unsigned char streamChannelId);
57
58 void setServerRequestAlternativeByteHandler(ServerRequestAlternativeByteHandler* handler, void* clientData) {
59 fServerRequestAlternativeByteHandler = handler;
60 fServerRequestAlternativeByteHandlerClientData = clientData;
61 }
62
63private:
64 static void tcpReadHandler(SocketDescriptor*, int mask);
65 Boolean tcpReadHandler1(int mask);
66
67private:
68 UsageEnvironment& fEnv;
69 int fOurSocketNum;
70 HashTable* fSubChannelHashTable;
71 ServerRequestAlternativeByteHandler* fServerRequestAlternativeByteHandler;
72 void* fServerRequestAlternativeByteHandlerClientData;
73 u_int8_t fStreamChannelId, fSizeByte1;
74 Boolean fReadErrorOccurred, fDeleteMyselfNext, fAreInReadHandlerLoop;
75 enum { AWAITING_DOLLAR, AWAITING_STREAM_CHANNEL_ID, AWAITING_SIZE1, AWAITING_SIZE2, AWAITING_PACKET_DATA } fTCPReadingState;
76};
77
78static SocketDescriptor* lookupSocketDescriptor(UsageEnvironment& env, int sockNum, Boolean createIfNotFound = True) {
79 HashTable* table = socketHashTable(env, createIfNotFound);
80 if (table == NULL) return NULL;
81
82 char const* key = (char const*)(long)sockNum;
83 SocketDescriptor* socketDescriptor = (SocketDescriptor*)(table->Lookup(key));
84 if (socketDescriptor == NULL) {
85 if (createIfNotFound) {
86 socketDescriptor = new SocketDescriptor(env, sockNum);
87 table->Add((char const*)(long)(sockNum), socketDescriptor);
88 } else if (table->IsEmpty()) {
89 // We can also delete the table (to reclaim space):
90 _Tables* ourTables = _Tables::getOurTables(env);
91 delete table;
92 ourTables->socketTable = NULL;
93 ourTables->reclaimIfPossible();
94 }
95 }
96
97 return socketDescriptor;
98}
99
100static void removeSocketDescription(UsageEnvironment& env, int sockNum) {
101 char const* key = (char const*)(long)sockNum;
102 HashTable* table = socketHashTable(env);
103 table->Remove(key);
104
105 if (table->IsEmpty()) {
106 // We can also delete the table (to reclaim space):
107 _Tables* ourTables = _Tables::getOurTables(env);
108 delete table;
109 ourTables->socketTable = NULL;
110 ourTables->reclaimIfPossible();
111 }
112}
113
114
115////////// RTPInterface - Implementation //////////
116
117RTPInterface::RTPInterface(Medium* owner, Groupsock* gs)
118 : fOwner(owner), fGS(gs),
119 fTCPStreams(NULL),
120 fNextTCPReadSize(0), fNextTCPReadStreamSocketNum(-1),
121 fNextTCPReadStreamChannelId(0xFF), fReadHandlerProc(NULL),
122 fAuxReadHandlerFunc(NULL), fAuxReadHandlerClientData(NULL) {
123 // Make the socket non-blocking, even though it will be read from only asynchronously, when packets arrive.
124 // The reason for this is that, in some OSs, reads on a blocking socket can (allegedly) sometimes block,
125 // even if the socket was previously reported (e.g., by "select()") as having data available.
126 // (This can supposedly happen if the UDP checksum fails, for example.)
127 makeSocketNonBlocking(fGS->socketNum());
128 increaseSendBufferTo(envir(), fGS->socketNum(), 50*1024);
129}
130
131RTPInterface::~RTPInterface() {
132 stopNetworkReading();
133 delete fTCPStreams;
134}
135
136void RTPInterface::setStreamSocket(int sockNum,
137 unsigned char streamChannelId) {
138 fGS->removeAllDestinations();
139 envir().taskScheduler().disableBackgroundHandling(fGS->socketNum()); // turn off any reading on our datagram socket
140 fGS->reset(); // and close our datagram socket, because we won't be using it anymore
141
142 addStreamSocket(sockNum, streamChannelId);
143}
144
145void RTPInterface::addStreamSocket(int sockNum,
146 unsigned char streamChannelId) {
147 if (sockNum < 0) return;
148
149 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
150 streams = streams->fNext) {
151 if (streams->fStreamSocketNum == sockNum
152 && streams->fStreamChannelId == streamChannelId) {
153 return; // we already have it
154 }
155 }
156
157 fTCPStreams = new tcpStreamRecord(sockNum, streamChannelId, fTCPStreams);
158
159 // Also, make sure this new socket is set up for receiving RTP/RTCP-over-TCP:
160 SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), sockNum);
161 socketDescriptor->registerRTPInterface(streamChannelId, this);
162}
163
164static void deregisterSocket(UsageEnvironment& env, int sockNum, unsigned char streamChannelId) {
165 SocketDescriptor* socketDescriptor = lookupSocketDescriptor(env, sockNum, False);
166 if (socketDescriptor != NULL) {
167 socketDescriptor->deregisterRTPInterface(streamChannelId);
168 // Note: This may delete "socketDescriptor",
169 // if no more interfaces are using this socket
170 }
171}
172
173void RTPInterface::removeStreamSocket(int sockNum,
174 unsigned char streamChannelId) {
175 // Remove - from our list of 'TCP streams' - the record of the (sockNum,streamChannelId) pair.
176 // (However "streamChannelId" == 0xFF is a special case, meaning remove all
177 // (sockNum,*) pairs.)
178
179 while (1) {
180 tcpStreamRecord** streamsPtr = &fTCPStreams;
181
182 while (*streamsPtr != NULL) {
183 if ((*streamsPtr)->fStreamSocketNum == sockNum
184 && (streamChannelId == 0xFF || streamChannelId == (*streamsPtr)->fStreamChannelId)) {
185 // Delete the record pointed to by *streamsPtr :
186 unsigned char streamChannelIdToRemove = (*streamsPtr)->fStreamChannelId;
187 tcpStreamRecord* next = (*streamsPtr)->fNext;
188 (*streamsPtr)->fNext = NULL;
189 delete (*streamsPtr);
190 *streamsPtr = next;
191
192 // And 'deregister' this socket,channelId pair:
193 deregisterSocket(envir(), sockNum, streamChannelIdToRemove);
194
195 if (streamChannelId != 0xFF) return; // we're done
196 break; // start again from the beginning of the list, in case the list has changed
197 } else {
198 streamsPtr = &((*streamsPtr)->fNext);
199 }
200 }
201 if (*streamsPtr == NULL) break;
202 }
203}
204
205void RTPInterface::setServerRequestAlternativeByteHandler(UsageEnvironment& env, int socketNum,
206 ServerRequestAlternativeByteHandler* handler, void* clientData) {
207 SocketDescriptor* socketDescriptor = lookupSocketDescriptor(env, socketNum, False);
208
209 if (socketDescriptor != NULL) socketDescriptor->setServerRequestAlternativeByteHandler(handler, clientData);
210}
211
212void RTPInterface::clearServerRequestAlternativeByteHandler(UsageEnvironment& env, int socketNum) {
213 setServerRequestAlternativeByteHandler(env, socketNum, NULL, NULL);
214}
215
216Boolean RTPInterface::sendPacket(unsigned char* packet, unsigned packetSize) {
217 Boolean success = True; // we'll return False instead if any of the sends fail
218
219 // Normal case: Send as a UDP packet:
220 if (!fGS->output(envir(), packet, packetSize)) success = False;
221
222 // Also, send over each of our TCP sockets:
223 tcpStreamRecord* nextStream;
224 for (tcpStreamRecord* stream = fTCPStreams; stream != NULL; stream = nextStream) {
225 nextStream = stream->fNext; // Set this now, in case the following deletes "stream":
226 if (!sendRTPorRTCPPacketOverTCP(packet, packetSize,
227 stream->fStreamSocketNum, stream->fStreamChannelId)) {
228 success = False;
229 }
230 }
231
232 return success;
233}
234
235void RTPInterface
236::startNetworkReading(TaskScheduler::BackgroundHandlerProc* handlerProc) {
237 // Normal case: Arrange to read UDP packets:
238 envir().taskScheduler().
239 turnOnBackgroundReadHandling(fGS->socketNum(), handlerProc, fOwner);
240
241 // Also, receive RTP over TCP, on each of our TCP connections:
242 fReadHandlerProc = handlerProc;
243 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
244 streams = streams->fNext) {
245 // Get a socket descriptor for "streams->fStreamSocketNum":
246 SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), streams->fStreamSocketNum);
247
248 // Tell it about our subChannel:
249 socketDescriptor->registerRTPInterface(streams->fStreamChannelId, this);
250 }
251}
252
253Boolean RTPInterface::handleRead(unsigned char* buffer, unsigned bufferMaxSize,
254 unsigned& bytesRead, struct sockaddr_in& fromAddress,
255 int& tcpSocketNum, unsigned char& tcpStreamChannelId,
256 Boolean& packetReadWasIncomplete) {
257 packetReadWasIncomplete = False; // by default
258 Boolean readSuccess;
259 if (fNextTCPReadStreamSocketNum < 0) {
260 // Normal case: read from the (datagram) 'groupsock':
261 tcpSocketNum = -1;
262 readSuccess = fGS->handleRead(buffer, bufferMaxSize, bytesRead, fromAddress);
263 } else {
264 // Read from the TCP connection:
265 tcpSocketNum = fNextTCPReadStreamSocketNum;
266 tcpStreamChannelId = fNextTCPReadStreamChannelId;
267
268 bytesRead = 0;
269 unsigned totBytesToRead = fNextTCPReadSize;
270 if (totBytesToRead > bufferMaxSize) totBytesToRead = bufferMaxSize;
271 unsigned curBytesToRead = totBytesToRead;
272 int curBytesRead;
273 while ((curBytesRead = readSocket(envir(), fNextTCPReadStreamSocketNum,
274 &buffer[bytesRead], curBytesToRead,
275 fromAddress)) > 0) {
276 bytesRead += curBytesRead;
277 if (bytesRead >= totBytesToRead) break;
278 curBytesToRead -= curBytesRead;
279 }
280 fNextTCPReadSize -= bytesRead;
281 if (fNextTCPReadSize == 0) {
282 // We've read all of the data that we asked for
283 readSuccess = True;
284 } else if (curBytesRead < 0) {
285 // There was an error reading the socket
286 bytesRead = 0;
287 readSuccess = False;
288 } else {
289 // We need to read more bytes, and there was not an error reading the socket
290 packetReadWasIncomplete = True;
291 return True;
292 }
293 fNextTCPReadStreamSocketNum = -1; // default, for next time
294 }
295
296 if (readSuccess && fAuxReadHandlerFunc != NULL) {
297 // Also pass the newly-read packet data to our auxilliary handler:
298 (*fAuxReadHandlerFunc)(fAuxReadHandlerClientData, buffer, bytesRead);
299 }
300 return readSuccess;
301}
302
303void RTPInterface::stopNetworkReading() {
304 // Normal case
305 if (fGS != NULL) envir().taskScheduler().turnOffBackgroundReadHandling(fGS->socketNum());
306
307 // Also turn off read handling on each of our TCP connections:
308 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL; streams = streams->fNext) {
309 deregisterSocket(envir(), streams->fStreamSocketNum, streams->fStreamChannelId);
310 }
311}
312
313
314////////// Helper Functions - Implementation /////////
315
316Boolean RTPInterface::sendRTPorRTCPPacketOverTCP(u_int8_t* packet, unsigned packetSize,
317 int socketNum, unsigned char streamChannelId) {
318#ifdef DEBUG_SEND
319 fprintf(stderr, "sendRTPorRTCPPacketOverTCP: %d bytes over channel %d (socket %d)\n",
320 packetSize, streamChannelId, socketNum); fflush(stderr);
321#endif
322 // Send a RTP/RTCP packet over TCP, using the encoding defined in RFC 2326, section 10.12:
323 // $<streamChannelId><packetSize><packet>
324 // (If the initial "send()" of '$<streamChannelId><packetSize>' succeeds, then we force
325 // the subsequent "send()" for the <packet> data to succeed, even if we have to do so with
326 // a blocking "send()".)
327 do {
328 u_int8_t framingHeader[4];
329 framingHeader[0] = '$';
330 framingHeader[1] = streamChannelId;
331 framingHeader[2] = (u_int8_t) ((packetSize&0xFF00)>>8);
332 framingHeader[3] = (u_int8_t) (packetSize&0xFF);
333 if (!sendDataOverTCP(socketNum, framingHeader, 4, False)) break;
334
335 if (!sendDataOverTCP(socketNum, packet, packetSize, True)) break;
336#ifdef DEBUG_SEND
337 fprintf(stderr, "sendRTPorRTCPPacketOverTCP: completed\n"); fflush(stderr);
338#endif
339
340 return True;
341 } while (0);
342
343#ifdef DEBUG_SEND
344 fprintf(stderr, "sendRTPorRTCPPacketOverTCP: failed! (errno %d)\n", envir().getErrno()); fflush(stderr);
345#endif
346 return False;
347}
348
349#ifndef RTPINTERFACE_BLOCKING_WRITE_TIMEOUT_MS
350#define RTPINTERFACE_BLOCKING_WRITE_TIMEOUT_MS 500
351#endif
352
353Boolean RTPInterface::sendDataOverTCP(int socketNum, u_int8_t const* data, unsigned dataSize, Boolean forceSendToSucceed) {
354 int sendResult = send(socketNum, (char const*)data, dataSize, 0/*flags*/);
355 if (sendResult < (int)dataSize) {
356 // The TCP send() failed - at least partially.
357
358 unsigned numBytesSentSoFar = sendResult < 0 ? 0 : (unsigned)sendResult;
359 if (numBytesSentSoFar > 0 || (forceSendToSucceed && envir().getErrno() == EAGAIN)) {
360 // The OS's TCP send buffer has filled up (because the stream's bitrate has exceeded
361 // the capacity of the TCP connection!).
362 // Force this data write to succeed, by blocking if necessary until it does:
363 unsigned numBytesRemainingToSend = dataSize - numBytesSentSoFar;
364#ifdef DEBUG_SEND
365 fprintf(stderr, "sendDataOverTCP: resending %d-byte send (blocking)\n", numBytesRemainingToSend); fflush(stderr);
366#endif
367 makeSocketBlocking(socketNum, RTPINTERFACE_BLOCKING_WRITE_TIMEOUT_MS);
368 sendResult = send(socketNum, (char const*)(&data[numBytesSentSoFar]), numBytesRemainingToSend, 0/*flags*/);
369 if ((unsigned)sendResult != numBytesRemainingToSend) {
370 // The blocking "send()" failed, or timed out. In either case, we assume that the
371 // TCP connection has failed (or is 'hanging' indefinitely), and we stop using it
372 // (for both RTP and RTP).
373 // (If we kept using the socket here, the RTP or RTCP packet write would be in an
374 // incomplete, inconsistent state.)
375#ifdef DEBUG_SEND
376 fprintf(stderr, "sendDataOverTCP: blocking send() failed (delivering %d bytes out of %d); closing socket %d\n", sendResult, numBytesRemainingToSend, socketNum); fflush(stderr);
377#endif
378 removeStreamSocket(socketNum, 0xFF);
379 return False;
380 }
381 makeSocketNonBlocking(socketNum);
382
383 return True;
384 } else if (sendResult < 0 && envir().getErrno() != EAGAIN) {
385 // Because the "send()" call failed, assume that the socket is now unusable, so stop
386 // using it (for both RTP and RTCP):
387 removeStreamSocket(socketNum, 0xFF);
388 }
389
390 return False;
391 }
392
393 return True;
394}
395
396SocketDescriptor::SocketDescriptor(UsageEnvironment& env, int socketNum)
397 :fEnv(env), fOurSocketNum(socketNum),
398 fSubChannelHashTable(HashTable::create(ONE_WORD_HASH_KEYS)),
399 fServerRequestAlternativeByteHandler(NULL), fServerRequestAlternativeByteHandlerClientData(NULL),
400 fReadErrorOccurred(False), fDeleteMyselfNext(False), fAreInReadHandlerLoop(False), fTCPReadingState(AWAITING_DOLLAR) {
401}
402
403SocketDescriptor::~SocketDescriptor() {
404 fEnv.taskScheduler().turnOffBackgroundReadHandling(fOurSocketNum);
405 removeSocketDescription(fEnv, fOurSocketNum);
406
407 if (fSubChannelHashTable != NULL) {
408 // Remove knowledge of this socket from any "RTPInterface"s that are using it:
409 HashTable::Iterator* iter = HashTable::Iterator::create(*fSubChannelHashTable);
410 RTPInterface* rtpInterface;
411 char const* key;
412
413 while ((rtpInterface = (RTPInterface*)(iter->next(key))) != NULL) {
414 u_int64_t streamChannelIdLong = (u_int64_t)key;
415 unsigned char streamChannelId = (unsigned char)streamChannelIdLong;
416
417 rtpInterface->removeStreamSocket(fOurSocketNum, streamChannelId);
418 }
419 delete iter;
420
421 // Then remove the hash table entries themselves, and then remove the hash table:
422 while (fSubChannelHashTable->RemoveNext() != NULL) {}
423 delete fSubChannelHashTable;
424 }
425
426 // Finally:
427 if (fServerRequestAlternativeByteHandler != NULL) {
428 // Hack: Pass a special character to our alternative byte handler, to tell it that either
429 // - an error occurred when reading the TCP socket, or
430 // - no error occurred, but it needs to take over control of the TCP socket once again.
431 u_int8_t specialChar = fReadErrorOccurred ? 0xFF : 0xFE;
432 (*fServerRequestAlternativeByteHandler)(fServerRequestAlternativeByteHandlerClientData, specialChar);
433 }
434}
435
436void SocketDescriptor::registerRTPInterface(unsigned char streamChannelId,
437 RTPInterface* rtpInterface) {
438 Boolean isFirstRegistration = fSubChannelHashTable->IsEmpty();
439#if defined(DEBUG_SEND)||defined(DEBUG_RECEIVE)
440 fprintf(stderr, "SocketDescriptor(socket %d)::registerRTPInterface(channel %d): isFirstRegistration %d\n", fOurSocketNum, streamChannelId, isFirstRegistration);
441#endif
442 fSubChannelHashTable->Add((char const*)(long)streamChannelId,
443 rtpInterface);
444
445 if (isFirstRegistration) {
446 // Arrange to handle reads on this TCP socket:
447 TaskScheduler::BackgroundHandlerProc* handler
448 = (TaskScheduler::BackgroundHandlerProc*)&tcpReadHandler;
449 fEnv.taskScheduler().
450 setBackgroundHandling(fOurSocketNum, SOCKET_READABLE|SOCKET_EXCEPTION, handler, this);
451 }
452}
453
454RTPInterface* SocketDescriptor
455::lookupRTPInterface(unsigned char streamChannelId) {
456 char const* lookupArg = (char const*)(long)streamChannelId;
457 return (RTPInterface*)(fSubChannelHashTable->Lookup(lookupArg));
458}
459
460void SocketDescriptor
461::deregisterRTPInterface(unsigned char streamChannelId) {
462#if defined(DEBUG_SEND)||defined(DEBUG_RECEIVE)
463 fprintf(stderr, "SocketDescriptor(socket %d)::deregisterRTPInterface(channel %d)\n", fOurSocketNum, streamChannelId);
464#endif
465 fSubChannelHashTable->Remove((char const*)(long)streamChannelId);
466
467 if (fSubChannelHashTable->IsEmpty()) {
468 // No more interfaces are using us, so it's curtains for us now:
469 if (fAreInReadHandlerLoop) {
470 fDeleteMyselfNext = True; // we can't delete ourself yet, but we'll do so from "tcpReadHandler()" below
471 } else {
472 delete this;
473 }
474 }
475}
476
477void SocketDescriptor::tcpReadHandler(SocketDescriptor* socketDescriptor, int mask) {
478 // Call the read handler until it returns false, with a limit to avoid starving other sockets
479 unsigned count = 2000;
480 socketDescriptor->fAreInReadHandlerLoop = True;
481 while (!socketDescriptor->fDeleteMyselfNext && socketDescriptor->tcpReadHandler1(mask) && --count > 0) {}
482 socketDescriptor->fAreInReadHandlerLoop = False;
483 if (socketDescriptor->fDeleteMyselfNext) delete socketDescriptor;
484}
485
486Boolean SocketDescriptor::tcpReadHandler1(int mask) {
487 // We expect the following data over the TCP channel:
488 // optional RTSP command or response bytes (before the first '$' character)
489 // a '$' character
490 // a 1-byte channel id
491 // a 2-byte packet size (in network byte order)
492 // the packet data.
493 // However, because the socket is being read asynchronously, this data might arrive in pieces.
494
495 u_int8_t c;
496 struct sockaddr_in fromAddress;
497 if (fTCPReadingState != AWAITING_PACKET_DATA) {
498 int result = readSocket(fEnv, fOurSocketNum, &c, 1, fromAddress);
499 if (result == 0) { // There was no more data to read
500 return False;
501 } else if (result != 1) { // error reading TCP socket, so we will no longer handle it
502#ifdef DEBUG_RECEIVE
503 fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): readSocket(1 byte) returned %d (error)\n", fOurSocketNum, result);
504#endif
505 fReadErrorOccurred = True;
506 fDeleteMyselfNext = True;
507 return False;
508 }
509 }
510
511 Boolean callAgain = True;
512 switch (fTCPReadingState) {
513 case AWAITING_DOLLAR: {
514 if (c == '$') {
515#ifdef DEBUG_RECEIVE
516 fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): Saw '$'\n", fOurSocketNum);
517#endif
518 fTCPReadingState = AWAITING_STREAM_CHANNEL_ID;
519 } else {
520 // This character is part of a RTSP request or command, which is handled separately:
521 if (fServerRequestAlternativeByteHandler != NULL && c != 0xFF && c != 0xFE) {
522 // Hack: 0xFF and 0xFE are used as special signaling characters, so don't send them
523 (*fServerRequestAlternativeByteHandler)(fServerRequestAlternativeByteHandlerClientData, c);
524 }
525 }
526 break;
527 }
528 case AWAITING_STREAM_CHANNEL_ID: {
529 // The byte that we read is the stream channel id.
530 if (lookupRTPInterface(c) != NULL) { // sanity check
531 fStreamChannelId = c;
532 fTCPReadingState = AWAITING_SIZE1;
533 } else {
534 // This wasn't a stream channel id that we expected. We're (somehow) in a strange state. Try to recover:
535#ifdef DEBUG_RECEIVE
536 fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): Saw nonexistent stream channel id: 0x%02x\n", fOurSocketNum, c);
537#endif
538 fTCPReadingState = AWAITING_DOLLAR;
539 }
540 break;
541 }
542 case AWAITING_SIZE1: {
543 // The byte that we read is the first (high) byte of the 16-bit RTP or RTCP packet 'size'.
544 fSizeByte1 = c;
545 fTCPReadingState = AWAITING_SIZE2;
546 break;
547 }
548 case AWAITING_SIZE2: {
549 // The byte that we read is the second (low) byte of the 16-bit RTP or RTCP packet 'size'.
550 unsigned short size = (fSizeByte1<<8)|c;
551
552 // Record the information about the packet data that will be read next:
553 RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId);
554 if (rtpInterface != NULL) {
555 rtpInterface->fNextTCPReadSize = size;
556 rtpInterface->fNextTCPReadStreamSocketNum = fOurSocketNum;
557 rtpInterface->fNextTCPReadStreamChannelId = fStreamChannelId;
558 }
559 fTCPReadingState = AWAITING_PACKET_DATA;
560 break;
561 }
562 case AWAITING_PACKET_DATA: {
563 callAgain = False;
564 fTCPReadingState = AWAITING_DOLLAR; // the next state, unless we end up having to read more data in the current state
565 // Call the appropriate read handler to get the packet data from the TCP stream:
566 RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId);
567 if (rtpInterface != NULL) {
568 if (rtpInterface->fNextTCPReadSize == 0) {
569 // We've already read all the data for this packet.
570 break;
571 }
572 if (rtpInterface->fReadHandlerProc != NULL) {
573#ifdef DEBUG_RECEIVE
574 fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): reading %d bytes on channel %d\n", fOurSocketNum, rtpInterface->fNextTCPReadSize, rtpInterface->fNextTCPReadStreamChannelId);
575#endif
576 fTCPReadingState = AWAITING_PACKET_DATA;
577 rtpInterface->fReadHandlerProc(rtpInterface->fOwner, mask);
578 } else {
579#ifdef DEBUG_RECEIVE
580 fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): No handler proc for \"rtpInterface\" for channel %d; need to skip %d remaining bytes\n", fOurSocketNum, fStreamChannelId, rtpInterface->fNextTCPReadSize);
581#endif
582 int result = readSocket(fEnv, fOurSocketNum, &c, 1, fromAddress);
583 if (result < 0) { // error reading TCP socket, so we will no longer handle it
584#ifdef DEBUG_RECEIVE
585 fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): readSocket(1 byte) returned %d (error)\n", fOurSocketNum, result);
586#endif
587 fReadErrorOccurred = True;
588 fDeleteMyselfNext = True;
589 return False;
590 } else {
591 fTCPReadingState = AWAITING_PACKET_DATA;
592 if (result == 1) {
593 --rtpInterface->fNextTCPReadSize;
594 callAgain = True;
595 }
596 }
597 }
598 }
599#ifdef DEBUG_RECEIVE
600 else fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): No \"rtpInterface\" for channel %d\n", fOurSocketNum, fStreamChannelId);
601#endif
602 }
603 }
604
605 return callAgain;
606}
607
608
609////////// tcpStreamRecord implementation //////////
610
611tcpStreamRecord
612::tcpStreamRecord(int streamSocketNum, unsigned char streamChannelId,
613 tcpStreamRecord* next)
614 : fNext(next),
615 fStreamSocketNum(streamSocketNum), fStreamChannelId(streamChannelId) {
616}
617
618tcpStreamRecord::~tcpStreamRecord() {
619 delete fNext;
620}
621