| 1 | /********** |
| 2 | This library is free software; you can redistribute it and/or modify it under |
| 3 | the terms of the GNU Lesser General Public License as published by the |
| 4 | Free Software Foundation; either version 3 of the License, or (at your |
| 5 | option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.) |
| 6 | |
| 7 | This library is distributed in the hope that it will be useful, but WITHOUT |
| 8 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| 9 | FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for |
| 10 | more details. |
| 11 | |
| 12 | You should have received a copy of the GNU Lesser General Public License |
| 13 | along with this library; if not, write to the Free Software Foundation, Inc., |
| 14 | 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
| 15 | **********/ |
| 16 | // "liveMedia" |
| 17 | // Copyright (c) 1996-2020 Live Networks, Inc. All rights reserved. |
| 18 | // An class that can be used to create (possibly multiple) 'replicas' of an incoming stream. |
| 19 | // Implementation. |
| 20 | |
| 21 | #include "StreamReplicator.hh" |
| 22 | |
| 23 | ////////// Definition of "StreamReplica": The class that implements each stream replica ////////// |
| 24 | |
| 25 | class StreamReplica: public FramedSource { |
| 26 | protected: |
| 27 | friend class StreamReplicator; |
| 28 | StreamReplica(StreamReplicator& ourReplicator); // called only by "StreamReplicator::createStreamReplica()" |
| 29 | virtual ~StreamReplica(); |
| 30 | |
| 31 | private: // redefined virtual functions: |
| 32 | virtual void doGetNextFrame(); |
| 33 | virtual void doStopGettingFrames(); |
| 34 | |
| 35 | private: |
| 36 | static void copyReceivedFrame(StreamReplica* toReplica, StreamReplica* fromReplica); |
| 37 | |
| 38 | private: |
| 39 | StreamReplicator& fOurReplicator; |
| 40 | int fFrameIndex; // 0 or 1, depending upon which frame we're currently requesting; could also be -1 if we've stopped playing |
| 41 | |
| 42 | // Replicas that are currently awaiting data are kept in a (singly-linked) list: |
| 43 | StreamReplica* fNext; |
| 44 | }; |
| 45 | |
| 46 | |
| 47 | ////////// StreamReplicator implementation ////////// |
| 48 | |
| 49 | StreamReplicator* StreamReplicator::createNew(UsageEnvironment& env, FramedSource* inputSource, Boolean deleteWhenLastReplicaDies) { |
| 50 | return new StreamReplicator(env, inputSource, deleteWhenLastReplicaDies); |
| 51 | } |
| 52 | |
| 53 | StreamReplicator::StreamReplicator(UsageEnvironment& env, FramedSource* inputSource, Boolean deleteWhenLastReplicaDies) |
| 54 | : Medium(env), |
| 55 | fInputSource(inputSource), fDeleteWhenLastReplicaDies(deleteWhenLastReplicaDies), fInputSourceHasClosed(False), |
| 56 | fNumReplicas(0), fNumActiveReplicas(0), fNumDeliveriesMadeSoFar(0), |
| 57 | fFrameIndex(0), fMasterReplica(NULL), fReplicasAwaitingCurrentFrame(NULL), fReplicasAwaitingNextFrame(NULL) { |
| 58 | } |
| 59 | |
| 60 | StreamReplicator::~StreamReplicator() { |
| 61 | Medium::close(fInputSource); |
| 62 | } |
| 63 | |
| 64 | FramedSource* StreamReplicator::createStreamReplica() { |
| 65 | ++fNumReplicas; |
| 66 | return new StreamReplica(*this); |
| 67 | } |
| 68 | |
| 69 | void StreamReplicator::getNextFrame(StreamReplica* replica) { |
| 70 | if (fInputSourceHasClosed) { // handle closure instead |
| 71 | replica->handleClosure(); |
| 72 | return; |
| 73 | } |
| 74 | |
| 75 | if (replica->fFrameIndex == -1) { |
| 76 | // This replica had stopped playing (or had just been created), but is now actively reading. Note this: |
| 77 | replica->fFrameIndex = fFrameIndex; |
| 78 | ++fNumActiveReplicas; |
| 79 | } |
| 80 | |
| 81 | if (fMasterReplica == NULL) { |
| 82 | // This is the first replica to request the next unread frame. Make it the 'master' replica - meaning that we read the frame |
| 83 | // into its buffer, and then copy from this into the other replicas' buffers. |
| 84 | fMasterReplica = replica; |
| 85 | |
| 86 | // Arrange to read the next frame into this replica's buffer: |
| 87 | if (fInputSource != NULL) fInputSource->getNextFrame(fMasterReplica->fTo, fMasterReplica->fMaxSize, |
| 88 | afterGettingFrame, this, onSourceClosure, this); |
| 89 | } else if (replica->fFrameIndex != fFrameIndex) { |
| 90 | // This replica is already asking for the next frame (because it has already received the current frame). Enqueue it: |
| 91 | replica->fNext = fReplicasAwaitingNextFrame; |
| 92 | fReplicasAwaitingNextFrame = replica; |
| 93 | } else { |
| 94 | // This replica is asking for the current frame. Enqueue it: |
| 95 | replica->fNext = fReplicasAwaitingCurrentFrame; |
| 96 | fReplicasAwaitingCurrentFrame = replica; |
| 97 | |
| 98 | if (fInputSource != NULL && !fInputSource->isCurrentlyAwaitingData()) { |
| 99 | // The current frame has already arrived, so deliver it to this replica now: |
| 100 | deliverReceivedFrame(); |
| 101 | } |
| 102 | } |
| 103 | } |
| 104 | |
| 105 | void StreamReplicator::deactivateStreamReplica(StreamReplica* replicaBeingDeactivated) { |
| 106 | if (replicaBeingDeactivated->fFrameIndex == -1) return; // this replica has already been deactivated (or was never activated at all) |
| 107 | |
| 108 | // Assert: fNumActiveReplicas > 0 |
| 109 | if (fNumActiveReplicas == 0) fprintf(stderr, "StreamReplicator::deactivateStreamReplica() Internal Error!\n" ); // should not happen |
| 110 | --fNumActiveReplicas; |
| 111 | |
| 112 | // Forget about any frame delivery that might have just been made to this replica: |
| 113 | if (replicaBeingDeactivated->fFrameIndex != fFrameIndex && fNumDeliveriesMadeSoFar > 0) --fNumDeliveriesMadeSoFar; |
| 114 | |
| 115 | replicaBeingDeactivated->fFrameIndex = -1; |
| 116 | |
| 117 | // Check whether the replica being deactivated is the 'master' replica, or is enqueued awaiting a frame: |
| 118 | if (replicaBeingDeactivated == fMasterReplica) { |
| 119 | // We need to replace the 'master replica', if we can: |
| 120 | if (fReplicasAwaitingCurrentFrame == NULL) { |
| 121 | // There's currently no replacement 'master replica' |
| 122 | fMasterReplica = NULL; |
| 123 | } else { |
| 124 | // There's another replica that we can use as a replacement 'master replica': |
| 125 | fMasterReplica = fReplicasAwaitingCurrentFrame; |
| 126 | fReplicasAwaitingCurrentFrame = fReplicasAwaitingCurrentFrame->fNext; |
| 127 | fMasterReplica->fNext = NULL; |
| 128 | } |
| 129 | |
| 130 | // Check whether the read into the old master replica's buffer is still pending, or has completed: |
| 131 | if (fInputSource != NULL) { |
| 132 | if (fInputSource->isCurrentlyAwaitingData()) { |
| 133 | // We have a pending read into the old master replica's buffer. |
| 134 | // We need to stop it, and retry the read with a new master (if available) |
| 135 | fInputSource->stopGettingFrames(); |
| 136 | |
| 137 | if (fMasterReplica != NULL) { |
| 138 | fInputSource->getNextFrame(fMasterReplica->fTo, fMasterReplica->fMaxSize, |
| 139 | afterGettingFrame, this, onSourceClosure, this); |
| 140 | } |
| 141 | } else { |
| 142 | // The read into the old master replica's buffer has already completed. Copy the data to the new master replica (if any): |
| 143 | if (fMasterReplica != NULL) { |
| 144 | StreamReplica::copyReceivedFrame(fMasterReplica, replicaBeingDeactivated); |
| 145 | } else { |
| 146 | // We don't have a new master replica, so we can't copy the received frame to any new replica that might ask for it. |
| 147 | // Fortunately this should be a very rare occurrence. |
| 148 | } |
| 149 | } |
| 150 | } |
| 151 | } else { |
| 152 | // The replica that's being removed was not our 'master replica', but make sure it's not on either of our queues: |
| 153 | if (fReplicasAwaitingCurrentFrame != NULL) { |
| 154 | if (replicaBeingDeactivated == fReplicasAwaitingCurrentFrame) { |
| 155 | fReplicasAwaitingCurrentFrame = replicaBeingDeactivated->fNext; |
| 156 | replicaBeingDeactivated->fNext = NULL; |
| 157 | } |
| 158 | else { |
| 159 | for (StreamReplica* r1 = fReplicasAwaitingCurrentFrame; r1->fNext != NULL; r1 = r1->fNext) { |
| 160 | if (r1->fNext == replicaBeingDeactivated) { |
| 161 | r1->fNext = replicaBeingDeactivated->fNext; |
| 162 | replicaBeingDeactivated->fNext = NULL; |
| 163 | break; |
| 164 | } |
| 165 | } |
| 166 | } |
| 167 | } |
| 168 | if (fReplicasAwaitingNextFrame != NULL) { |
| 169 | if (replicaBeingDeactivated == fReplicasAwaitingNextFrame) { |
| 170 | fReplicasAwaitingNextFrame = replicaBeingDeactivated->fNext; |
| 171 | replicaBeingDeactivated->fNext = NULL; |
| 172 | } |
| 173 | else { |
| 174 | for (StreamReplica* r2 = fReplicasAwaitingNextFrame; r2->fNext != NULL; r2 = r2->fNext) { |
| 175 | if (r2->fNext == replicaBeingDeactivated) { |
| 176 | r2->fNext = replicaBeingDeactivated->fNext; |
| 177 | replicaBeingDeactivated->fNext = NULL; |
| 178 | break; |
| 179 | } |
| 180 | } |
| 181 | } |
| 182 | } |
| 183 | |
| 184 | // Check for the possibility that - now that a replica has been deactivated - all other |
| 185 | // replicas have received the current frame, and so now we need to complete delivery to |
| 186 | // the master replica: |
| 187 | if (fMasterReplica != NULL && fInputSource != NULL && !fInputSource->isCurrentlyAwaitingData()) deliverReceivedFrame(); |
| 188 | } |
| 189 | |
| 190 | if (fNumActiveReplicas == 0 && fInputSource != NULL) fInputSource->stopGettingFrames(); // tell our source to stop too |
| 191 | } |
| 192 | |
| 193 | void StreamReplicator::removeStreamReplica(StreamReplica* replicaBeingRemoved) { |
| 194 | // First, handle the replica that's being removed the same way that we would if it were merely being deactivated: |
| 195 | deactivateStreamReplica(replicaBeingRemoved); |
| 196 | |
| 197 | // Assert: fNumReplicas > 0 |
| 198 | if (fNumReplicas == 0) fprintf(stderr, "StreamReplicator::removeStreamReplica() Internal Error!\n" ); // should not happen |
| 199 | --fNumReplicas; |
| 200 | |
| 201 | // If this was the last replica, then delete ourselves (if we were set up to do so): |
| 202 | if (fNumReplicas == 0 && fDeleteWhenLastReplicaDies) { |
| 203 | Medium::close(this); |
| 204 | return; |
| 205 | } |
| 206 | } |
| 207 | |
| 208 | void StreamReplicator::afterGettingFrame(void* clientData, unsigned frameSize, unsigned numTruncatedBytes, |
| 209 | struct timeval presentationTime, unsigned durationInMicroseconds) { |
| 210 | ((StreamReplicator*)clientData)->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime, durationInMicroseconds); |
| 211 | } |
| 212 | |
| 213 | void StreamReplicator::afterGettingFrame(unsigned frameSize, unsigned numTruncatedBytes, |
| 214 | struct timeval presentationTime, unsigned durationInMicroseconds) { |
| 215 | // The frame was read into our master replica's buffer. Update the master replica's state, but don't complete delivery to it |
| 216 | // just yet. We do that later, after we're sure that we've delivered it to all other replicas. |
| 217 | fMasterReplica->fFrameSize = frameSize; |
| 218 | fMasterReplica->fNumTruncatedBytes = numTruncatedBytes; |
| 219 | fMasterReplica->fPresentationTime = presentationTime; |
| 220 | fMasterReplica->fDurationInMicroseconds = durationInMicroseconds; |
| 221 | |
| 222 | deliverReceivedFrame(); |
| 223 | } |
| 224 | |
| 225 | void StreamReplicator::onSourceClosure(void* clientData) { |
| 226 | ((StreamReplicator*)clientData)->onSourceClosure(); |
| 227 | } |
| 228 | |
| 229 | void StreamReplicator::onSourceClosure() { |
| 230 | fInputSourceHasClosed = True; |
| 231 | |
| 232 | // Signal the closure to each replica that is currently awaiting a frame: |
| 233 | StreamReplica* replica; |
| 234 | while ((replica = fReplicasAwaitingCurrentFrame) != NULL) { |
| 235 | fReplicasAwaitingCurrentFrame = replica->fNext; |
| 236 | replica->fNext = NULL; |
| 237 | replica->handleClosure(); |
| 238 | } |
| 239 | while ((replica = fReplicasAwaitingNextFrame) != NULL) { |
| 240 | fReplicasAwaitingNextFrame = replica->fNext; |
| 241 | replica->fNext = NULL; |
| 242 | replica->handleClosure(); |
| 243 | } |
| 244 | if ((replica = fMasterReplica) != NULL) { |
| 245 | fMasterReplica = NULL; |
| 246 | replica->handleClosure(); |
| 247 | } |
| 248 | } |
| 249 | |
| 250 | void StreamReplicator::deliverReceivedFrame() { |
| 251 | // The 'master replica' has received its copy of the current frame. |
| 252 | // Copy it (and complete delivery) to any other replica that has requested this frame. |
| 253 | // Then, if no more requests for this frame are expected, complete delivery to the 'master replica' itself. |
| 254 | StreamReplica* replica; |
| 255 | while ((replica = fReplicasAwaitingCurrentFrame) != NULL) { |
| 256 | fReplicasAwaitingCurrentFrame = replica->fNext; |
| 257 | replica->fNext = NULL; |
| 258 | |
| 259 | // Assert: fMasterReplica != NULL |
| 260 | if (fMasterReplica == NULL) fprintf(stderr, "StreamReplicator::deliverReceivedFrame() Internal Error 1!\n" ); // shouldn't happen |
| 261 | StreamReplica::copyReceivedFrame(replica, fMasterReplica); |
| 262 | replica->fFrameIndex = 1 - replica->fFrameIndex; // toggle it (0<->1), because this replica no longer awaits the current frame |
| 263 | ++fNumDeliveriesMadeSoFar; |
| 264 | |
| 265 | // Assert: fNumDeliveriesMadeSoFar < fNumActiveReplicas; // because we still have the 'master replica' to deliver to |
| 266 | if (!(fNumDeliveriesMadeSoFar < fNumActiveReplicas)) fprintf(stderr, "StreamReplicator::deliverReceivedFrame() Internal Error 2(%d,%d)!\n" , fNumDeliveriesMadeSoFar, fNumActiveReplicas); // should not happen |
| 267 | |
| 268 | // Complete delivery to this replica: |
| 269 | FramedSource::afterGetting(replica); |
| 270 | } |
| 271 | |
| 272 | if (fNumDeliveriesMadeSoFar == fNumActiveReplicas - 1 && fMasterReplica != NULL) { |
| 273 | // No more requests for this frame are expected, so complete delivery to the 'master replica': |
| 274 | replica = fMasterReplica; |
| 275 | fMasterReplica = NULL; |
| 276 | replica->fFrameIndex = 1 - replica->fFrameIndex; // toggle it (0<->1), because this replica no longer awaits the current frame |
| 277 | fFrameIndex = 1 - fFrameIndex; // toggle it (0<->1) for the next frame |
| 278 | fNumDeliveriesMadeSoFar = 0; // reset for the next frame |
| 279 | |
| 280 | if (fReplicasAwaitingNextFrame != NULL) { |
| 281 | // One of the other replicas has already requested the next frame, so make it the next 'master replica': |
| 282 | fMasterReplica = fReplicasAwaitingNextFrame; |
| 283 | fReplicasAwaitingNextFrame = fReplicasAwaitingNextFrame->fNext; |
| 284 | fMasterReplica->fNext = NULL; |
| 285 | |
| 286 | // Arrange to read the next frame into this replica's buffer: |
| 287 | if (fInputSource != NULL) fInputSource->getNextFrame(fMasterReplica->fTo, fMasterReplica->fMaxSize, |
| 288 | afterGettingFrame, this, onSourceClosure, this); |
| 289 | } |
| 290 | |
| 291 | // Move any other replicas that had already requested the next frame to the 'requesting current frame' list: |
| 292 | // Assert: fReplicasAwaitingCurrentFrame == NULL; |
| 293 | if (!(fReplicasAwaitingCurrentFrame == NULL)) fprintf(stderr, "StreamReplicator::deliverReceivedFrame() Internal Error 3!\n" ); // should not happen |
| 294 | fReplicasAwaitingCurrentFrame = fReplicasAwaitingNextFrame; |
| 295 | fReplicasAwaitingNextFrame = NULL; |
| 296 | |
| 297 | // Complete delivery to the 'master' replica (thereby completing all deliveries for this frame): |
| 298 | FramedSource::afterGetting(replica); |
| 299 | } |
| 300 | } |
| 301 | |
| 302 | |
| 303 | ////////// StreamReplica implementation ////////// |
| 304 | |
| 305 | StreamReplica::StreamReplica(StreamReplicator& ourReplicator) |
| 306 | : FramedSource(ourReplicator.envir()), |
| 307 | fOurReplicator(ourReplicator), |
| 308 | fFrameIndex(-1/*we haven't started playing yet*/), fNext(NULL) { |
| 309 | } |
| 310 | |
| 311 | StreamReplica::~StreamReplica() { |
| 312 | fOurReplicator.removeStreamReplica(this); |
| 313 | } |
| 314 | |
| 315 | void StreamReplica::doGetNextFrame() { |
| 316 | fOurReplicator.getNextFrame(this); |
| 317 | } |
| 318 | |
| 319 | void StreamReplica::doStopGettingFrames() { |
| 320 | fOurReplicator.deactivateStreamReplica(this); |
| 321 | } |
| 322 | |
| 323 | void StreamReplica::copyReceivedFrame(StreamReplica* toReplica, StreamReplica* fromReplica) { |
| 324 | // First, figure out how much data to copy. ("toReplica" might have a smaller buffer than "fromReplica".) |
| 325 | unsigned numNewBytesToTruncate |
| 326 | = toReplica->fMaxSize < fromReplica->fFrameSize ? fromReplica->fFrameSize - toReplica->fMaxSize : 0; |
| 327 | toReplica->fFrameSize = fromReplica->fFrameSize - numNewBytesToTruncate; |
| 328 | toReplica->fNumTruncatedBytes = fromReplica->fNumTruncatedBytes + numNewBytesToTruncate; |
| 329 | |
| 330 | memmove(toReplica->fTo, fromReplica->fTo, toReplica->fFrameSize); |
| 331 | toReplica->fPresentationTime = fromReplica->fPresentationTime; |
| 332 | toReplica->fDurationInMicroseconds = fromReplica->fDurationInMicroseconds; |
| 333 | } |
| 334 | |