1 | /********** |
2 | This library is free software; you can redistribute it and/or modify it under |
3 | the terms of the GNU Lesser General Public License as published by the |
4 | Free Software Foundation; either version 3 of the License, or (at your |
5 | option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.) |
6 | |
7 | This library is distributed in the hope that it will be useful, but WITHOUT |
8 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
9 | FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for |
10 | more details. |
11 | |
12 | You should have received a copy of the GNU Lesser General Public License |
13 | along with this library; if not, write to the Free Software Foundation, Inc., |
14 | 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
15 | **********/ |
16 | // "liveMedia" |
17 | // Copyright (c) 1996-2020 Live Networks, Inc. All rights reserved. |
18 | // A sink representing a TCP output stream |
19 | // Implementation |
20 | |
21 | #include "TCPStreamSink.hh" |
22 | #include <GroupsockHelper.hh> // for "ignoreSigPipeOnSocket()" |
23 | |
24 | TCPStreamSink* TCPStreamSink::createNew(UsageEnvironment& env, int socketNum) { |
25 | return new TCPStreamSink(env, socketNum); |
26 | } |
27 | |
28 | TCPStreamSink::TCPStreamSink(UsageEnvironment& env, int socketNum) |
29 | : MediaSink(env), |
30 | fUnwrittenBytesStart(0), fUnwrittenBytesEnd(0), |
31 | fInputSourceIsOpen(False), fOutputSocketIsWritable(True), |
32 | fOutputSocketNum(socketNum) { |
33 | ignoreSigPipeOnSocket(socketNum); |
34 | } |
35 | |
36 | TCPStreamSink::~TCPStreamSink() { |
37 | // Turn off any pending background handling of our output socket: |
38 | envir().taskScheduler().disableBackgroundHandling(fOutputSocketNum); |
39 | } |
40 | |
41 | Boolean TCPStreamSink::continuePlaying() { |
42 | fInputSourceIsOpen = fSource != NULL; |
43 | processBuffer(); |
44 | |
45 | return True; |
46 | } |
47 | |
48 | #define TCP_STREAM_SINK_MIN_READ_SIZE 1000 |
49 | |
50 | void TCPStreamSink::processBuffer() { |
51 | // First, try writing data to our output socket, if we can: |
52 | if (fOutputSocketIsWritable && numUnwrittenBytes() > 0) { |
53 | int numBytesWritten |
54 | = send(fOutputSocketNum, (const char*)&fBuffer[fUnwrittenBytesStart], numUnwrittenBytes(), 0); |
55 | if (numBytesWritten < (int)numUnwrittenBytes()) { |
56 | // The output socket is no longer writable. Set a handler to be called when it becomes writable again. |
57 | fOutputSocketIsWritable = False; |
58 | if (envir().getErrno() != EPIPE) { // on this error, the socket might still be writable, but no longer usable |
59 | envir().taskScheduler().setBackgroundHandling(fOutputSocketNum, SOCKET_WRITABLE, socketWritableHandler, this); |
60 | } |
61 | } |
62 | if (numBytesWritten > 0) { |
63 | // We wrote at least some of our data. Update our buffer pointers: |
64 | fUnwrittenBytesStart += numBytesWritten; |
65 | if (fUnwrittenBytesStart > fUnwrittenBytesEnd) fUnwrittenBytesStart = fUnwrittenBytesEnd; // sanity check |
66 | if (fUnwrittenBytesStart == fUnwrittenBytesEnd && (!fInputSourceIsOpen || !fSource->isCurrentlyAwaitingData())) { |
67 | fUnwrittenBytesStart = fUnwrittenBytesEnd = 0; // reset the buffer to empty |
68 | } |
69 | } |
70 | } |
71 | |
72 | // Then, read from our input source, if we can (& we're not already reading from it): |
73 | if (fInputSourceIsOpen && freeBufferSpace() >= TCP_STREAM_SINK_MIN_READ_SIZE && !fSource->isCurrentlyAwaitingData()) { |
74 | fSource->getNextFrame(&fBuffer[fUnwrittenBytesEnd], freeBufferSpace(), afterGettingFrame, this, ourOnSourceClosure, this); |
75 | } else if (!fInputSourceIsOpen && numUnwrittenBytes() == 0) { |
76 | // We're now done: |
77 | onSourceClosure(); |
78 | } |
79 | } |
80 | |
81 | void TCPStreamSink::socketWritableHandler(void* clientData, int /*mask*/) { |
82 | TCPStreamSink* sink = (TCPStreamSink*)clientData; |
83 | sink->socketWritableHandler1(); |
84 | } |
85 | |
86 | void TCPStreamSink::socketWritableHandler1() { |
87 | envir().taskScheduler().disableBackgroundHandling(fOutputSocketNum); // disable this handler until the next time it's needed |
88 | |
89 | fOutputSocketIsWritable = True; |
90 | processBuffer(); |
91 | } |
92 | |
93 | void TCPStreamSink::afterGettingFrame(void* clientData, unsigned frameSize, unsigned numTruncatedBytes, |
94 | struct timeval /*presentationTime*/, unsigned /*durationInMicroseconds*/) { |
95 | TCPStreamSink* sink = (TCPStreamSink*)clientData; |
96 | sink->afterGettingFrame(frameSize, numTruncatedBytes); |
97 | } |
98 | |
99 | void TCPStreamSink::afterGettingFrame(unsigned frameSize, unsigned numTruncatedBytes) { |
100 | if (numTruncatedBytes > 0) { |
101 | envir() << "TCPStreamSink::afterGettingFrame(): The input frame data was too large for our buffer. " |
102 | << numTruncatedBytes |
103 | << " bytes of trailing data was dropped! Correct this by increasing the definition of \"TCP_STREAM_SINK_BUFFER_SIZE\" in \"include/TCPStreamSink.hh\".\n" ; |
104 | } |
105 | fUnwrittenBytesEnd += frameSize; |
106 | processBuffer(); |
107 | } |
108 | |
109 | void TCPStreamSink::ourOnSourceClosure(void* clientData) { |
110 | TCPStreamSink* sink = (TCPStreamSink*)clientData; |
111 | sink->ourOnSourceClosure1(); |
112 | } |
113 | |
114 | void TCPStreamSink::ourOnSourceClosure1() { |
115 | // The input source has closed: |
116 | fInputSourceIsOpen = False; |
117 | processBuffer(); |
118 | } |
119 | |