1 | /********** |
2 | This library is free software; you can redistribute it and/or modify it under |
3 | the terms of the GNU Lesser General Public License as published by the |
4 | Free Software Foundation; either version 3 of the License, or (at your |
5 | option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.) |
6 | |
7 | This library is distributed in the hope that it will be useful, but WITHOUT |
8 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
9 | FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for |
10 | more details. |
11 | |
12 | You should have received a copy of the GNU Lesser General Public License |
13 | along with this library; if not, write to the Free Software Foundation, Inc., |
14 | 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
15 | **********/ |
16 | // "liveMedia" |
17 | // Copyright (c) 1996-2020 Live Networks, Inc. All rights reserved. |
18 | // An class that can be used to create (possibly multiple) 'replicas' of an incoming stream. |
19 | // Implementation. |
20 | |
21 | #include "StreamReplicator.hh" |
22 | |
23 | ////////// Definition of "StreamReplica": The class that implements each stream replica ////////// |
24 | |
25 | class StreamReplica: public FramedSource { |
26 | protected: |
27 | friend class StreamReplicator; |
28 | StreamReplica(StreamReplicator& ourReplicator); // called only by "StreamReplicator::createStreamReplica()" |
29 | virtual ~StreamReplica(); |
30 | |
31 | private: // redefined virtual functions: |
32 | virtual void doGetNextFrame(); |
33 | virtual void doStopGettingFrames(); |
34 | |
35 | private: |
36 | static void copyReceivedFrame(StreamReplica* toReplica, StreamReplica* fromReplica); |
37 | |
38 | private: |
39 | StreamReplicator& fOurReplicator; |
40 | int fFrameIndex; // 0 or 1, depending upon which frame we're currently requesting; could also be -1 if we've stopped playing |
41 | |
42 | // Replicas that are currently awaiting data are kept in a (singly-linked) list: |
43 | StreamReplica* fNext; |
44 | }; |
45 | |
46 | |
47 | ////////// StreamReplicator implementation ////////// |
48 | |
49 | StreamReplicator* StreamReplicator::createNew(UsageEnvironment& env, FramedSource* inputSource, Boolean deleteWhenLastReplicaDies) { |
50 | return new StreamReplicator(env, inputSource, deleteWhenLastReplicaDies); |
51 | } |
52 | |
53 | StreamReplicator::StreamReplicator(UsageEnvironment& env, FramedSource* inputSource, Boolean deleteWhenLastReplicaDies) |
54 | : Medium(env), |
55 | fInputSource(inputSource), fDeleteWhenLastReplicaDies(deleteWhenLastReplicaDies), fInputSourceHasClosed(False), |
56 | fNumReplicas(0), fNumActiveReplicas(0), fNumDeliveriesMadeSoFar(0), |
57 | fFrameIndex(0), fMasterReplica(NULL), fReplicasAwaitingCurrentFrame(NULL), fReplicasAwaitingNextFrame(NULL) { |
58 | } |
59 | |
60 | StreamReplicator::~StreamReplicator() { |
61 | Medium::close(fInputSource); |
62 | } |
63 | |
64 | FramedSource* StreamReplicator::createStreamReplica() { |
65 | ++fNumReplicas; |
66 | return new StreamReplica(*this); |
67 | } |
68 | |
69 | void StreamReplicator::getNextFrame(StreamReplica* replica) { |
70 | if (fInputSourceHasClosed) { // handle closure instead |
71 | replica->handleClosure(); |
72 | return; |
73 | } |
74 | |
75 | if (replica->fFrameIndex == -1) { |
76 | // This replica had stopped playing (or had just been created), but is now actively reading. Note this: |
77 | replica->fFrameIndex = fFrameIndex; |
78 | ++fNumActiveReplicas; |
79 | } |
80 | |
81 | if (fMasterReplica == NULL) { |
82 | // This is the first replica to request the next unread frame. Make it the 'master' replica - meaning that we read the frame |
83 | // into its buffer, and then copy from this into the other replicas' buffers. |
84 | fMasterReplica = replica; |
85 | |
86 | // Arrange to read the next frame into this replica's buffer: |
87 | if (fInputSource != NULL) fInputSource->getNextFrame(fMasterReplica->fTo, fMasterReplica->fMaxSize, |
88 | afterGettingFrame, this, onSourceClosure, this); |
89 | } else if (replica->fFrameIndex != fFrameIndex) { |
90 | // This replica is already asking for the next frame (because it has already received the current frame). Enqueue it: |
91 | replica->fNext = fReplicasAwaitingNextFrame; |
92 | fReplicasAwaitingNextFrame = replica; |
93 | } else { |
94 | // This replica is asking for the current frame. Enqueue it: |
95 | replica->fNext = fReplicasAwaitingCurrentFrame; |
96 | fReplicasAwaitingCurrentFrame = replica; |
97 | |
98 | if (fInputSource != NULL && !fInputSource->isCurrentlyAwaitingData()) { |
99 | // The current frame has already arrived, so deliver it to this replica now: |
100 | deliverReceivedFrame(); |
101 | } |
102 | } |
103 | } |
104 | |
105 | void StreamReplicator::deactivateStreamReplica(StreamReplica* replicaBeingDeactivated) { |
106 | if (replicaBeingDeactivated->fFrameIndex == -1) return; // this replica has already been deactivated (or was never activated at all) |
107 | |
108 | // Assert: fNumActiveReplicas > 0 |
109 | if (fNumActiveReplicas == 0) fprintf(stderr, "StreamReplicator::deactivateStreamReplica() Internal Error!\n" ); // should not happen |
110 | --fNumActiveReplicas; |
111 | |
112 | // Forget about any frame delivery that might have just been made to this replica: |
113 | if (replicaBeingDeactivated->fFrameIndex != fFrameIndex && fNumDeliveriesMadeSoFar > 0) --fNumDeliveriesMadeSoFar; |
114 | |
115 | replicaBeingDeactivated->fFrameIndex = -1; |
116 | |
117 | // Check whether the replica being deactivated is the 'master' replica, or is enqueued awaiting a frame: |
118 | if (replicaBeingDeactivated == fMasterReplica) { |
119 | // We need to replace the 'master replica', if we can: |
120 | if (fReplicasAwaitingCurrentFrame == NULL) { |
121 | // There's currently no replacement 'master replica' |
122 | fMasterReplica = NULL; |
123 | } else { |
124 | // There's another replica that we can use as a replacement 'master replica': |
125 | fMasterReplica = fReplicasAwaitingCurrentFrame; |
126 | fReplicasAwaitingCurrentFrame = fReplicasAwaitingCurrentFrame->fNext; |
127 | fMasterReplica->fNext = NULL; |
128 | } |
129 | |
130 | // Check whether the read into the old master replica's buffer is still pending, or has completed: |
131 | if (fInputSource != NULL) { |
132 | if (fInputSource->isCurrentlyAwaitingData()) { |
133 | // We have a pending read into the old master replica's buffer. |
134 | // We need to stop it, and retry the read with a new master (if available) |
135 | fInputSource->stopGettingFrames(); |
136 | |
137 | if (fMasterReplica != NULL) { |
138 | fInputSource->getNextFrame(fMasterReplica->fTo, fMasterReplica->fMaxSize, |
139 | afterGettingFrame, this, onSourceClosure, this); |
140 | } |
141 | } else { |
142 | // The read into the old master replica's buffer has already completed. Copy the data to the new master replica (if any): |
143 | if (fMasterReplica != NULL) { |
144 | StreamReplica::copyReceivedFrame(fMasterReplica, replicaBeingDeactivated); |
145 | } else { |
146 | // We don't have a new master replica, so we can't copy the received frame to any new replica that might ask for it. |
147 | // Fortunately this should be a very rare occurrence. |
148 | } |
149 | } |
150 | } |
151 | } else { |
152 | // The replica that's being removed was not our 'master replica', but make sure it's not on either of our queues: |
153 | if (fReplicasAwaitingCurrentFrame != NULL) { |
154 | if (replicaBeingDeactivated == fReplicasAwaitingCurrentFrame) { |
155 | fReplicasAwaitingCurrentFrame = replicaBeingDeactivated->fNext; |
156 | replicaBeingDeactivated->fNext = NULL; |
157 | } |
158 | else { |
159 | for (StreamReplica* r1 = fReplicasAwaitingCurrentFrame; r1->fNext != NULL; r1 = r1->fNext) { |
160 | if (r1->fNext == replicaBeingDeactivated) { |
161 | r1->fNext = replicaBeingDeactivated->fNext; |
162 | replicaBeingDeactivated->fNext = NULL; |
163 | break; |
164 | } |
165 | } |
166 | } |
167 | } |
168 | if (fReplicasAwaitingNextFrame != NULL) { |
169 | if (replicaBeingDeactivated == fReplicasAwaitingNextFrame) { |
170 | fReplicasAwaitingNextFrame = replicaBeingDeactivated->fNext; |
171 | replicaBeingDeactivated->fNext = NULL; |
172 | } |
173 | else { |
174 | for (StreamReplica* r2 = fReplicasAwaitingNextFrame; r2->fNext != NULL; r2 = r2->fNext) { |
175 | if (r2->fNext == replicaBeingDeactivated) { |
176 | r2->fNext = replicaBeingDeactivated->fNext; |
177 | replicaBeingDeactivated->fNext = NULL; |
178 | break; |
179 | } |
180 | } |
181 | } |
182 | } |
183 | |
184 | // Check for the possibility that - now that a replica has been deactivated - all other |
185 | // replicas have received the current frame, and so now we need to complete delivery to |
186 | // the master replica: |
187 | if (fMasterReplica != NULL && fInputSource != NULL && !fInputSource->isCurrentlyAwaitingData()) deliverReceivedFrame(); |
188 | } |
189 | |
190 | if (fNumActiveReplicas == 0 && fInputSource != NULL) fInputSource->stopGettingFrames(); // tell our source to stop too |
191 | } |
192 | |
193 | void StreamReplicator::removeStreamReplica(StreamReplica* replicaBeingRemoved) { |
194 | // First, handle the replica that's being removed the same way that we would if it were merely being deactivated: |
195 | deactivateStreamReplica(replicaBeingRemoved); |
196 | |
197 | // Assert: fNumReplicas > 0 |
198 | if (fNumReplicas == 0) fprintf(stderr, "StreamReplicator::removeStreamReplica() Internal Error!\n" ); // should not happen |
199 | --fNumReplicas; |
200 | |
201 | // If this was the last replica, then delete ourselves (if we were set up to do so): |
202 | if (fNumReplicas == 0 && fDeleteWhenLastReplicaDies) { |
203 | Medium::close(this); |
204 | return; |
205 | } |
206 | } |
207 | |
208 | void StreamReplicator::afterGettingFrame(void* clientData, unsigned frameSize, unsigned numTruncatedBytes, |
209 | struct timeval presentationTime, unsigned durationInMicroseconds) { |
210 | ((StreamReplicator*)clientData)->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime, durationInMicroseconds); |
211 | } |
212 | |
213 | void StreamReplicator::afterGettingFrame(unsigned frameSize, unsigned numTruncatedBytes, |
214 | struct timeval presentationTime, unsigned durationInMicroseconds) { |
215 | // The frame was read into our master replica's buffer. Update the master replica's state, but don't complete delivery to it |
216 | // just yet. We do that later, after we're sure that we've delivered it to all other replicas. |
217 | fMasterReplica->fFrameSize = frameSize; |
218 | fMasterReplica->fNumTruncatedBytes = numTruncatedBytes; |
219 | fMasterReplica->fPresentationTime = presentationTime; |
220 | fMasterReplica->fDurationInMicroseconds = durationInMicroseconds; |
221 | |
222 | deliverReceivedFrame(); |
223 | } |
224 | |
225 | void StreamReplicator::onSourceClosure(void* clientData) { |
226 | ((StreamReplicator*)clientData)->onSourceClosure(); |
227 | } |
228 | |
229 | void StreamReplicator::onSourceClosure() { |
230 | fInputSourceHasClosed = True; |
231 | |
232 | // Signal the closure to each replica that is currently awaiting a frame: |
233 | StreamReplica* replica; |
234 | while ((replica = fReplicasAwaitingCurrentFrame) != NULL) { |
235 | fReplicasAwaitingCurrentFrame = replica->fNext; |
236 | replica->fNext = NULL; |
237 | replica->handleClosure(); |
238 | } |
239 | while ((replica = fReplicasAwaitingNextFrame) != NULL) { |
240 | fReplicasAwaitingNextFrame = replica->fNext; |
241 | replica->fNext = NULL; |
242 | replica->handleClosure(); |
243 | } |
244 | if ((replica = fMasterReplica) != NULL) { |
245 | fMasterReplica = NULL; |
246 | replica->handleClosure(); |
247 | } |
248 | } |
249 | |
250 | void StreamReplicator::deliverReceivedFrame() { |
251 | // The 'master replica' has received its copy of the current frame. |
252 | // Copy it (and complete delivery) to any other replica that has requested this frame. |
253 | // Then, if no more requests for this frame are expected, complete delivery to the 'master replica' itself. |
254 | StreamReplica* replica; |
255 | while ((replica = fReplicasAwaitingCurrentFrame) != NULL) { |
256 | fReplicasAwaitingCurrentFrame = replica->fNext; |
257 | replica->fNext = NULL; |
258 | |
259 | // Assert: fMasterReplica != NULL |
260 | if (fMasterReplica == NULL) fprintf(stderr, "StreamReplicator::deliverReceivedFrame() Internal Error 1!\n" ); // shouldn't happen |
261 | StreamReplica::copyReceivedFrame(replica, fMasterReplica); |
262 | replica->fFrameIndex = 1 - replica->fFrameIndex; // toggle it (0<->1), because this replica no longer awaits the current frame |
263 | ++fNumDeliveriesMadeSoFar; |
264 | |
265 | // Assert: fNumDeliveriesMadeSoFar < fNumActiveReplicas; // because we still have the 'master replica' to deliver to |
266 | if (!(fNumDeliveriesMadeSoFar < fNumActiveReplicas)) fprintf(stderr, "StreamReplicator::deliverReceivedFrame() Internal Error 2(%d,%d)!\n" , fNumDeliveriesMadeSoFar, fNumActiveReplicas); // should not happen |
267 | |
268 | // Complete delivery to this replica: |
269 | FramedSource::afterGetting(replica); |
270 | } |
271 | |
272 | if (fNumDeliveriesMadeSoFar == fNumActiveReplicas - 1 && fMasterReplica != NULL) { |
273 | // No more requests for this frame are expected, so complete delivery to the 'master replica': |
274 | replica = fMasterReplica; |
275 | fMasterReplica = NULL; |
276 | replica->fFrameIndex = 1 - replica->fFrameIndex; // toggle it (0<->1), because this replica no longer awaits the current frame |
277 | fFrameIndex = 1 - fFrameIndex; // toggle it (0<->1) for the next frame |
278 | fNumDeliveriesMadeSoFar = 0; // reset for the next frame |
279 | |
280 | if (fReplicasAwaitingNextFrame != NULL) { |
281 | // One of the other replicas has already requested the next frame, so make it the next 'master replica': |
282 | fMasterReplica = fReplicasAwaitingNextFrame; |
283 | fReplicasAwaitingNextFrame = fReplicasAwaitingNextFrame->fNext; |
284 | fMasterReplica->fNext = NULL; |
285 | |
286 | // Arrange to read the next frame into this replica's buffer: |
287 | if (fInputSource != NULL) fInputSource->getNextFrame(fMasterReplica->fTo, fMasterReplica->fMaxSize, |
288 | afterGettingFrame, this, onSourceClosure, this); |
289 | } |
290 | |
291 | // Move any other replicas that had already requested the next frame to the 'requesting current frame' list: |
292 | // Assert: fReplicasAwaitingCurrentFrame == NULL; |
293 | if (!(fReplicasAwaitingCurrentFrame == NULL)) fprintf(stderr, "StreamReplicator::deliverReceivedFrame() Internal Error 3!\n" ); // should not happen |
294 | fReplicasAwaitingCurrentFrame = fReplicasAwaitingNextFrame; |
295 | fReplicasAwaitingNextFrame = NULL; |
296 | |
297 | // Complete delivery to the 'master' replica (thereby completing all deliveries for this frame): |
298 | FramedSource::afterGetting(replica); |
299 | } |
300 | } |
301 | |
302 | |
303 | ////////// StreamReplica implementation ////////// |
304 | |
305 | StreamReplica::StreamReplica(StreamReplicator& ourReplicator) |
306 | : FramedSource(ourReplicator.envir()), |
307 | fOurReplicator(ourReplicator), |
308 | fFrameIndex(-1/*we haven't started playing yet*/), fNext(NULL) { |
309 | } |
310 | |
311 | StreamReplica::~StreamReplica() { |
312 | fOurReplicator.removeStreamReplica(this); |
313 | } |
314 | |
315 | void StreamReplica::doGetNextFrame() { |
316 | fOurReplicator.getNextFrame(this); |
317 | } |
318 | |
319 | void StreamReplica::doStopGettingFrames() { |
320 | fOurReplicator.deactivateStreamReplica(this); |
321 | } |
322 | |
323 | void StreamReplica::copyReceivedFrame(StreamReplica* toReplica, StreamReplica* fromReplica) { |
324 | // First, figure out how much data to copy. ("toReplica" might have a smaller buffer than "fromReplica".) |
325 | unsigned numNewBytesToTruncate |
326 | = toReplica->fMaxSize < fromReplica->fFrameSize ? fromReplica->fFrameSize - toReplica->fMaxSize : 0; |
327 | toReplica->fFrameSize = fromReplica->fFrameSize - numNewBytesToTruncate; |
328 | toReplica->fNumTruncatedBytes = fromReplica->fNumTruncatedBytes + numNewBytesToTruncate; |
329 | |
330 | memmove(toReplica->fTo, fromReplica->fTo, toReplica->fFrameSize); |
331 | toReplica->fPresentationTime = fromReplica->fPresentationTime; |
332 | toReplica->fDurationInMicroseconds = fromReplica->fDurationInMicroseconds; |
333 | } |
334 | |