1/**********
2This library is free software; you can redistribute it and/or modify it under
3the terms of the GNU Lesser General Public License as published by the
4Free Software Foundation; either version 3 of the License, or (at your
5option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
6
7This library is distributed in the hope that it will be useful, but WITHOUT
8ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
9FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
10more details.
11
12You should have received a copy of the GNU Lesser General Public License
13along with this library; if not, write to the Free Software Foundation, Inc.,
1451 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15**********/
16// "liveMedia"
17// Copyright (c) 1996-2020 Live Networks, Inc. All rights reserved.
18// An class that can be used to create (possibly multiple) 'replicas' of an incoming stream.
19// Implementation.
20
21#include "StreamReplicator.hh"
22
23////////// Definition of "StreamReplica": The class that implements each stream replica //////////
24
25class StreamReplica: public FramedSource {
26protected:
27 friend class StreamReplicator;
28 StreamReplica(StreamReplicator& ourReplicator); // called only by "StreamReplicator::createStreamReplica()"
29 virtual ~StreamReplica();
30
31private: // redefined virtual functions:
32 virtual void doGetNextFrame();
33 virtual void doStopGettingFrames();
34
35private:
36 static void copyReceivedFrame(StreamReplica* toReplica, StreamReplica* fromReplica);
37
38private:
39 StreamReplicator& fOurReplicator;
40 int fFrameIndex; // 0 or 1, depending upon which frame we're currently requesting; could also be -1 if we've stopped playing
41
42 // Replicas that are currently awaiting data are kept in a (singly-linked) list:
43 StreamReplica* fNext;
44};
45
46
47////////// StreamReplicator implementation //////////
48
49StreamReplicator* StreamReplicator::createNew(UsageEnvironment& env, FramedSource* inputSource, Boolean deleteWhenLastReplicaDies) {
50 return new StreamReplicator(env, inputSource, deleteWhenLastReplicaDies);
51}
52
53StreamReplicator::StreamReplicator(UsageEnvironment& env, FramedSource* inputSource, Boolean deleteWhenLastReplicaDies)
54 : Medium(env),
55 fInputSource(inputSource), fDeleteWhenLastReplicaDies(deleteWhenLastReplicaDies), fInputSourceHasClosed(False),
56 fNumReplicas(0), fNumActiveReplicas(0), fNumDeliveriesMadeSoFar(0),
57 fFrameIndex(0), fMasterReplica(NULL), fReplicasAwaitingCurrentFrame(NULL), fReplicasAwaitingNextFrame(NULL) {
58}
59
60StreamReplicator::~StreamReplicator() {
61 Medium::close(fInputSource);
62}
63
64FramedSource* StreamReplicator::createStreamReplica() {
65 ++fNumReplicas;
66 return new StreamReplica(*this);
67}
68
69void StreamReplicator::getNextFrame(StreamReplica* replica) {
70 if (fInputSourceHasClosed) { // handle closure instead
71 replica->handleClosure();
72 return;
73 }
74
75 if (replica->fFrameIndex == -1) {
76 // This replica had stopped playing (or had just been created), but is now actively reading. Note this:
77 replica->fFrameIndex = fFrameIndex;
78 ++fNumActiveReplicas;
79 }
80
81 if (fMasterReplica == NULL) {
82 // This is the first replica to request the next unread frame. Make it the 'master' replica - meaning that we read the frame
83 // into its buffer, and then copy from this into the other replicas' buffers.
84 fMasterReplica = replica;
85
86 // Arrange to read the next frame into this replica's buffer:
87 if (fInputSource != NULL) fInputSource->getNextFrame(fMasterReplica->fTo, fMasterReplica->fMaxSize,
88 afterGettingFrame, this, onSourceClosure, this);
89 } else if (replica->fFrameIndex != fFrameIndex) {
90 // This replica is already asking for the next frame (because it has already received the current frame). Enqueue it:
91 replica->fNext = fReplicasAwaitingNextFrame;
92 fReplicasAwaitingNextFrame = replica;
93 } else {
94 // This replica is asking for the current frame. Enqueue it:
95 replica->fNext = fReplicasAwaitingCurrentFrame;
96 fReplicasAwaitingCurrentFrame = replica;
97
98 if (fInputSource != NULL && !fInputSource->isCurrentlyAwaitingData()) {
99 // The current frame has already arrived, so deliver it to this replica now:
100 deliverReceivedFrame();
101 }
102 }
103}
104
105void StreamReplicator::deactivateStreamReplica(StreamReplica* replicaBeingDeactivated) {
106 if (replicaBeingDeactivated->fFrameIndex == -1) return; // this replica has already been deactivated (or was never activated at all)
107
108 // Assert: fNumActiveReplicas > 0
109 if (fNumActiveReplicas == 0) fprintf(stderr, "StreamReplicator::deactivateStreamReplica() Internal Error!\n"); // should not happen
110 --fNumActiveReplicas;
111
112 // Forget about any frame delivery that might have just been made to this replica:
113 if (replicaBeingDeactivated->fFrameIndex != fFrameIndex && fNumDeliveriesMadeSoFar > 0) --fNumDeliveriesMadeSoFar;
114
115 replicaBeingDeactivated->fFrameIndex = -1;
116
117 // Check whether the replica being deactivated is the 'master' replica, or is enqueued awaiting a frame:
118 if (replicaBeingDeactivated == fMasterReplica) {
119 // We need to replace the 'master replica', if we can:
120 if (fReplicasAwaitingCurrentFrame == NULL) {
121 // There's currently no replacement 'master replica'
122 fMasterReplica = NULL;
123 } else {
124 // There's another replica that we can use as a replacement 'master replica':
125 fMasterReplica = fReplicasAwaitingCurrentFrame;
126 fReplicasAwaitingCurrentFrame = fReplicasAwaitingCurrentFrame->fNext;
127 fMasterReplica->fNext = NULL;
128 }
129
130 // Check whether the read into the old master replica's buffer is still pending, or has completed:
131 if (fInputSource != NULL) {
132 if (fInputSource->isCurrentlyAwaitingData()) {
133 // We have a pending read into the old master replica's buffer.
134 // We need to stop it, and retry the read with a new master (if available)
135 fInputSource->stopGettingFrames();
136
137 if (fMasterReplica != NULL) {
138 fInputSource->getNextFrame(fMasterReplica->fTo, fMasterReplica->fMaxSize,
139 afterGettingFrame, this, onSourceClosure, this);
140 }
141 } else {
142 // The read into the old master replica's buffer has already completed. Copy the data to the new master replica (if any):
143 if (fMasterReplica != NULL) {
144 StreamReplica::copyReceivedFrame(fMasterReplica, replicaBeingDeactivated);
145 } else {
146 // We don't have a new master replica, so we can't copy the received frame to any new replica that might ask for it.
147 // Fortunately this should be a very rare occurrence.
148 }
149 }
150 }
151 } else {
152 // The replica that's being removed was not our 'master replica', but make sure it's not on either of our queues:
153 if (fReplicasAwaitingCurrentFrame != NULL) {
154 if (replicaBeingDeactivated == fReplicasAwaitingCurrentFrame) {
155 fReplicasAwaitingCurrentFrame = replicaBeingDeactivated->fNext;
156 replicaBeingDeactivated->fNext = NULL;
157 }
158 else {
159 for (StreamReplica* r1 = fReplicasAwaitingCurrentFrame; r1->fNext != NULL; r1 = r1->fNext) {
160 if (r1->fNext == replicaBeingDeactivated) {
161 r1->fNext = replicaBeingDeactivated->fNext;
162 replicaBeingDeactivated->fNext = NULL;
163 break;
164 }
165 }
166 }
167 }
168 if (fReplicasAwaitingNextFrame != NULL) {
169 if (replicaBeingDeactivated == fReplicasAwaitingNextFrame) {
170 fReplicasAwaitingNextFrame = replicaBeingDeactivated->fNext;
171 replicaBeingDeactivated->fNext = NULL;
172 }
173 else {
174 for (StreamReplica* r2 = fReplicasAwaitingNextFrame; r2->fNext != NULL; r2 = r2->fNext) {
175 if (r2->fNext == replicaBeingDeactivated) {
176 r2->fNext = replicaBeingDeactivated->fNext;
177 replicaBeingDeactivated->fNext = NULL;
178 break;
179 }
180 }
181 }
182 }
183
184 // Check for the possibility that - now that a replica has been deactivated - all other
185 // replicas have received the current frame, and so now we need to complete delivery to
186 // the master replica:
187 if (fMasterReplica != NULL && fInputSource != NULL && !fInputSource->isCurrentlyAwaitingData()) deliverReceivedFrame();
188 }
189
190 if (fNumActiveReplicas == 0 && fInputSource != NULL) fInputSource->stopGettingFrames(); // tell our source to stop too
191}
192
193void StreamReplicator::removeStreamReplica(StreamReplica* replicaBeingRemoved) {
194 // First, handle the replica that's being removed the same way that we would if it were merely being deactivated:
195 deactivateStreamReplica(replicaBeingRemoved);
196
197 // Assert: fNumReplicas > 0
198 if (fNumReplicas == 0) fprintf(stderr, "StreamReplicator::removeStreamReplica() Internal Error!\n"); // should not happen
199 --fNumReplicas;
200
201 // If this was the last replica, then delete ourselves (if we were set up to do so):
202 if (fNumReplicas == 0 && fDeleteWhenLastReplicaDies) {
203 Medium::close(this);
204 return;
205 }
206}
207
208void StreamReplicator::afterGettingFrame(void* clientData, unsigned frameSize, unsigned numTruncatedBytes,
209 struct timeval presentationTime, unsigned durationInMicroseconds) {
210 ((StreamReplicator*)clientData)->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime, durationInMicroseconds);
211}
212
213void StreamReplicator::afterGettingFrame(unsigned frameSize, unsigned numTruncatedBytes,
214 struct timeval presentationTime, unsigned durationInMicroseconds) {
215 // The frame was read into our master replica's buffer. Update the master replica's state, but don't complete delivery to it
216 // just yet. We do that later, after we're sure that we've delivered it to all other replicas.
217 fMasterReplica->fFrameSize = frameSize;
218 fMasterReplica->fNumTruncatedBytes = numTruncatedBytes;
219 fMasterReplica->fPresentationTime = presentationTime;
220 fMasterReplica->fDurationInMicroseconds = durationInMicroseconds;
221
222 deliverReceivedFrame();
223}
224
225void StreamReplicator::onSourceClosure(void* clientData) {
226 ((StreamReplicator*)clientData)->onSourceClosure();
227}
228
229void StreamReplicator::onSourceClosure() {
230 fInputSourceHasClosed = True;
231
232 // Signal the closure to each replica that is currently awaiting a frame:
233 StreamReplica* replica;
234 while ((replica = fReplicasAwaitingCurrentFrame) != NULL) {
235 fReplicasAwaitingCurrentFrame = replica->fNext;
236 replica->fNext = NULL;
237 replica->handleClosure();
238 }
239 while ((replica = fReplicasAwaitingNextFrame) != NULL) {
240 fReplicasAwaitingNextFrame = replica->fNext;
241 replica->fNext = NULL;
242 replica->handleClosure();
243 }
244 if ((replica = fMasterReplica) != NULL) {
245 fMasterReplica = NULL;
246 replica->handleClosure();
247 }
248}
249
250void StreamReplicator::deliverReceivedFrame() {
251 // The 'master replica' has received its copy of the current frame.
252 // Copy it (and complete delivery) to any other replica that has requested this frame.
253 // Then, if no more requests for this frame are expected, complete delivery to the 'master replica' itself.
254 StreamReplica* replica;
255 while ((replica = fReplicasAwaitingCurrentFrame) != NULL) {
256 fReplicasAwaitingCurrentFrame = replica->fNext;
257 replica->fNext = NULL;
258
259 // Assert: fMasterReplica != NULL
260 if (fMasterReplica == NULL) fprintf(stderr, "StreamReplicator::deliverReceivedFrame() Internal Error 1!\n"); // shouldn't happen
261 StreamReplica::copyReceivedFrame(replica, fMasterReplica);
262 replica->fFrameIndex = 1 - replica->fFrameIndex; // toggle it (0<->1), because this replica no longer awaits the current frame
263 ++fNumDeliveriesMadeSoFar;
264
265 // Assert: fNumDeliveriesMadeSoFar < fNumActiveReplicas; // because we still have the 'master replica' to deliver to
266 if (!(fNumDeliveriesMadeSoFar < fNumActiveReplicas)) fprintf(stderr, "StreamReplicator::deliverReceivedFrame() Internal Error 2(%d,%d)!\n", fNumDeliveriesMadeSoFar, fNumActiveReplicas); // should not happen
267
268 // Complete delivery to this replica:
269 FramedSource::afterGetting(replica);
270 }
271
272 if (fNumDeliveriesMadeSoFar == fNumActiveReplicas - 1 && fMasterReplica != NULL) {
273 // No more requests for this frame are expected, so complete delivery to the 'master replica':
274 replica = fMasterReplica;
275 fMasterReplica = NULL;
276 replica->fFrameIndex = 1 - replica->fFrameIndex; // toggle it (0<->1), because this replica no longer awaits the current frame
277 fFrameIndex = 1 - fFrameIndex; // toggle it (0<->1) for the next frame
278 fNumDeliveriesMadeSoFar = 0; // reset for the next frame
279
280 if (fReplicasAwaitingNextFrame != NULL) {
281 // One of the other replicas has already requested the next frame, so make it the next 'master replica':
282 fMasterReplica = fReplicasAwaitingNextFrame;
283 fReplicasAwaitingNextFrame = fReplicasAwaitingNextFrame->fNext;
284 fMasterReplica->fNext = NULL;
285
286 // Arrange to read the next frame into this replica's buffer:
287 if (fInputSource != NULL) fInputSource->getNextFrame(fMasterReplica->fTo, fMasterReplica->fMaxSize,
288 afterGettingFrame, this, onSourceClosure, this);
289 }
290
291 // Move any other replicas that had already requested the next frame to the 'requesting current frame' list:
292 // Assert: fReplicasAwaitingCurrentFrame == NULL;
293 if (!(fReplicasAwaitingCurrentFrame == NULL)) fprintf(stderr, "StreamReplicator::deliverReceivedFrame() Internal Error 3!\n"); // should not happen
294 fReplicasAwaitingCurrentFrame = fReplicasAwaitingNextFrame;
295 fReplicasAwaitingNextFrame = NULL;
296
297 // Complete delivery to the 'master' replica (thereby completing all deliveries for this frame):
298 FramedSource::afterGetting(replica);
299 }
300}
301
302
303////////// StreamReplica implementation //////////
304
305StreamReplica::StreamReplica(StreamReplicator& ourReplicator)
306 : FramedSource(ourReplicator.envir()),
307 fOurReplicator(ourReplicator),
308 fFrameIndex(-1/*we haven't started playing yet*/), fNext(NULL) {
309}
310
311StreamReplica::~StreamReplica() {
312 fOurReplicator.removeStreamReplica(this);
313}
314
315void StreamReplica::doGetNextFrame() {
316 fOurReplicator.getNextFrame(this);
317}
318
319void StreamReplica::doStopGettingFrames() {
320 fOurReplicator.deactivateStreamReplica(this);
321}
322
323void StreamReplica::copyReceivedFrame(StreamReplica* toReplica, StreamReplica* fromReplica) {
324 // First, figure out how much data to copy. ("toReplica" might have a smaller buffer than "fromReplica".)
325 unsigned numNewBytesToTruncate
326 = toReplica->fMaxSize < fromReplica->fFrameSize ? fromReplica->fFrameSize - toReplica->fMaxSize : 0;
327 toReplica->fFrameSize = fromReplica->fFrameSize - numNewBytesToTruncate;
328 toReplica->fNumTruncatedBytes = fromReplica->fNumTruncatedBytes + numNewBytesToTruncate;
329
330 memmove(toReplica->fTo, fromReplica->fTo, toReplica->fFrameSize);
331 toReplica->fPresentationTime = fromReplica->fPresentationTime;
332 toReplica->fDurationInMicroseconds = fromReplica->fDurationInMicroseconds;
333}
334