| 1 | /********** |
| 2 | This library is free software; you can redistribute it and/or modify it under |
| 3 | the terms of the GNU Lesser General Public License as published by the |
| 4 | Free Software Foundation; either version 3 of the License, or (at your |
| 5 | option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.) |
| 6 | |
| 7 | This library is distributed in the hope that it will be useful, but WITHOUT |
| 8 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| 9 | FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for |
| 10 | more details. |
| 11 | |
| 12 | You should have received a copy of the GNU Lesser General Public License |
| 13 | along with this library; if not, write to the Free Software Foundation, Inc., |
| 14 | 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
| 15 | **********/ |
| 16 | // "liveMedia" |
| 17 | // Copyright (c) 1996-2020 Live Networks, Inc. All rights reserved. |
| 18 | // A sink representing a TCP output stream |
| 19 | // Implementation |
| 20 | |
| 21 | #include "TCPStreamSink.hh" |
| 22 | #include <GroupsockHelper.hh> // for "ignoreSigPipeOnSocket()" |
| 23 | |
| 24 | TCPStreamSink* TCPStreamSink::createNew(UsageEnvironment& env, int socketNum) { |
| 25 | return new TCPStreamSink(env, socketNum); |
| 26 | } |
| 27 | |
| 28 | TCPStreamSink::TCPStreamSink(UsageEnvironment& env, int socketNum) |
| 29 | : MediaSink(env), |
| 30 | fUnwrittenBytesStart(0), fUnwrittenBytesEnd(0), |
| 31 | fInputSourceIsOpen(False), fOutputSocketIsWritable(True), |
| 32 | fOutputSocketNum(socketNum) { |
| 33 | ignoreSigPipeOnSocket(socketNum); |
| 34 | } |
| 35 | |
| 36 | TCPStreamSink::~TCPStreamSink() { |
| 37 | // Turn off any pending background handling of our output socket: |
| 38 | envir().taskScheduler().disableBackgroundHandling(fOutputSocketNum); |
| 39 | } |
| 40 | |
| 41 | Boolean TCPStreamSink::continuePlaying() { |
| 42 | fInputSourceIsOpen = fSource != NULL; |
| 43 | processBuffer(); |
| 44 | |
| 45 | return True; |
| 46 | } |
| 47 | |
| 48 | #define TCP_STREAM_SINK_MIN_READ_SIZE 1000 |
| 49 | |
| 50 | void TCPStreamSink::processBuffer() { |
| 51 | // First, try writing data to our output socket, if we can: |
| 52 | if (fOutputSocketIsWritable && numUnwrittenBytes() > 0) { |
| 53 | int numBytesWritten |
| 54 | = send(fOutputSocketNum, (const char*)&fBuffer[fUnwrittenBytesStart], numUnwrittenBytes(), 0); |
| 55 | if (numBytesWritten < (int)numUnwrittenBytes()) { |
| 56 | // The output socket is no longer writable. Set a handler to be called when it becomes writable again. |
| 57 | fOutputSocketIsWritable = False; |
| 58 | if (envir().getErrno() != EPIPE) { // on this error, the socket might still be writable, but no longer usable |
| 59 | envir().taskScheduler().setBackgroundHandling(fOutputSocketNum, SOCKET_WRITABLE, socketWritableHandler, this); |
| 60 | } |
| 61 | } |
| 62 | if (numBytesWritten > 0) { |
| 63 | // We wrote at least some of our data. Update our buffer pointers: |
| 64 | fUnwrittenBytesStart += numBytesWritten; |
| 65 | if (fUnwrittenBytesStart > fUnwrittenBytesEnd) fUnwrittenBytesStart = fUnwrittenBytesEnd; // sanity check |
| 66 | if (fUnwrittenBytesStart == fUnwrittenBytesEnd && (!fInputSourceIsOpen || !fSource->isCurrentlyAwaitingData())) { |
| 67 | fUnwrittenBytesStart = fUnwrittenBytesEnd = 0; // reset the buffer to empty |
| 68 | } |
| 69 | } |
| 70 | } |
| 71 | |
| 72 | // Then, read from our input source, if we can (& we're not already reading from it): |
| 73 | if (fInputSourceIsOpen && freeBufferSpace() >= TCP_STREAM_SINK_MIN_READ_SIZE && !fSource->isCurrentlyAwaitingData()) { |
| 74 | fSource->getNextFrame(&fBuffer[fUnwrittenBytesEnd], freeBufferSpace(), afterGettingFrame, this, ourOnSourceClosure, this); |
| 75 | } else if (!fInputSourceIsOpen && numUnwrittenBytes() == 0) { |
| 76 | // We're now done: |
| 77 | onSourceClosure(); |
| 78 | } |
| 79 | } |
| 80 | |
| 81 | void TCPStreamSink::socketWritableHandler(void* clientData, int /*mask*/) { |
| 82 | TCPStreamSink* sink = (TCPStreamSink*)clientData; |
| 83 | sink->socketWritableHandler1(); |
| 84 | } |
| 85 | |
| 86 | void TCPStreamSink::socketWritableHandler1() { |
| 87 | envir().taskScheduler().disableBackgroundHandling(fOutputSocketNum); // disable this handler until the next time it's needed |
| 88 | |
| 89 | fOutputSocketIsWritable = True; |
| 90 | processBuffer(); |
| 91 | } |
| 92 | |
| 93 | void TCPStreamSink::afterGettingFrame(void* clientData, unsigned frameSize, unsigned numTruncatedBytes, |
| 94 | struct timeval /*presentationTime*/, unsigned /*durationInMicroseconds*/) { |
| 95 | TCPStreamSink* sink = (TCPStreamSink*)clientData; |
| 96 | sink->afterGettingFrame(frameSize, numTruncatedBytes); |
| 97 | } |
| 98 | |
| 99 | void TCPStreamSink::afterGettingFrame(unsigned frameSize, unsigned numTruncatedBytes) { |
| 100 | if (numTruncatedBytes > 0) { |
| 101 | envir() << "TCPStreamSink::afterGettingFrame(): The input frame data was too large for our buffer. " |
| 102 | << numTruncatedBytes |
| 103 | << " bytes of trailing data was dropped! Correct this by increasing the definition of \"TCP_STREAM_SINK_BUFFER_SIZE\" in \"include/TCPStreamSink.hh\".\n" ; |
| 104 | } |
| 105 | fUnwrittenBytesEnd += frameSize; |
| 106 | processBuffer(); |
| 107 | } |
| 108 | |
| 109 | void TCPStreamSink::ourOnSourceClosure(void* clientData) { |
| 110 | TCPStreamSink* sink = (TCPStreamSink*)clientData; |
| 111 | sink->ourOnSourceClosure1(); |
| 112 | } |
| 113 | |
| 114 | void TCPStreamSink::ourOnSourceClosure1() { |
| 115 | // The input source has closed: |
| 116 | fInputSourceIsOpen = False; |
| 117 | processBuffer(); |
| 118 | } |
| 119 | |