Skip to content
This repository was archived by the owner on Oct 25, 2024. It is now read-only.

Commit 1290482

Browse files
committed
Subscribed streams are associated with subscriptions.
When subscribe a stream in conference mode, the subscribed stream no longer associated with RemoteStream. It allows a RemoteStream to be subscribed multiple times. It also allows a subscription has audio and video track from different remote streams. Since the signaling protocol defined by server side does not provide an ID for a track, the SDK usually use stream ID + track kind to identify a track. The stream ID and track ID mentioned in conference mode indicate the ID assigned by conference sever, they could be different from MediaStream ID and MediaStreamTrack ID.
1 parent 943e6b4 commit 1290482

File tree

4 files changed

+58
-62
lines changed

4 files changed

+58
-62
lines changed

docs/mdfiles/changelog.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
Change Log
22
==========
3+
# 5.1
4+
* When subscribe a stream in conference mode, the subscribed MediaStream or BidirectionalStream is associated with a `Owt.Conference.Subscription` instead of a `Owt.Base.RemoteStream`. The `stream` property of a RemoteStream in conference mode is always undefined, while a new property `stream` is added to `Subscription`. It allows a RemoteStream to be subscribed multiple times, as well as subscribing audio and video tracks from different streams.
35
# 5.0
46
* Add WebTransport support for conference mode, see [this design doc](../../design/webtransport.md) for detailed information.
57
* All publications and subscriptions for the same conference use the same `PeerConnection`.

src/sdk/base/stream.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,9 @@ export class LocalStream extends Stream {
149149
}
150150
/**
151151
* @class RemoteStream
152-
* @classDesc Stream sent from a remote endpoint.
152+
* @classDesc Stream sent from a remote endpoint. In conference mode,
153+
* RemoteStream's stream is always undefined. Please get MediaStream or
154+
* ReadableStream from subscription's stream property.
153155
* Events:
154156
*
155157
* | Event Name | Argument Type | Fired when |

src/sdk/conference/channel.js

Lines changed: 37 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ export class ConferencePeerConnectionChannel extends EventDispatcher {
4343
this._pendingCandidates = [];
4444
this._subscribePromises = new Map(); // internalId => { resolve, reject }
4545
this._publishPromises = new Map(); // internalId => { resolve, reject }
46-
this._subscribedStreams = new Map(); // intenalId => RemoteStream
4746
this._publications = new Map(); // PublicationId => Publication
4847
this._subscriptions = new Map(); // SubscriptionId => Subscription
4948
this._publishTransceivers = new Map(); // internalId => { id, transceivers: [Transceiver] }
@@ -52,9 +51,8 @@ export class ConferencePeerConnectionChannel extends EventDispatcher {
5251
// Timer for PeerConnection disconnected. Will stop connection after timer.
5352
this._disconnectTimer = null;
5453
this._ended = false;
55-
this._stopped = false;
5654
// Channel ID assigned by conference
57-
this._id = undefined;
55+
this._transportId = undefined;
5856
// Used to create internal ID for publication/subscription
5957
this._internalCount = 0;
6058
this._sdpPromise = Promise.resolve();
@@ -300,11 +298,11 @@ export class ConferencePeerConnectionChannel extends EventDispatcher {
300298
return this._signaling.sendSignalingMessage('publish', {
301299
media: {tracks: trackOptions},
302300
attributes: stream.attributes,
303-
transport: {id: this._id, type: 'webrtc'},
301+
transport: {id: this._transportId, type: 'webrtc'},
304302
}).catch((e) => {
305303
// Send SDP even when failed to get Answer.
306304
this._signaling.sendSignalingMessage('soac', {
307-
id: this._id,
305+
id: this._transportId,
308306
signaling: localDesc,
309307
});
310308
throw e;
@@ -320,10 +318,10 @@ export class ConferencePeerConnectionChannel extends EventDispatcher {
320318
this._publishTransceivers.get(internalId).id = publicationId;
321319
this._reverseIdMap.set(publicationId, internalId);
322320

323-
if (this._id && this._id !== data.transportId) {
321+
if (this._transportId && this._transportId !== data.transportId) {
324322
Logger.warning('Server returns conflict ID: ' + data.transportId);
325323
}
326-
this._id = data.transportId;
324+
this._transportId = data.transportId;
327325

328326
// Modify local SDP before sending
329327
if (options) {
@@ -335,7 +333,7 @@ export class ConferencePeerConnectionChannel extends EventDispatcher {
335333
});
336334
}
337335
this._signaling.sendSignalingMessage('soac', {
338-
id: this._id,
336+
id: this._transportId,
339337
signaling: localDesc,
340338
});
341339
}).catch((e) => {
@@ -483,7 +481,6 @@ export class ConferencePeerConnectionChannel extends EventDispatcher {
483481
offerOptions.offerToReceiveVideo = !!options.video;
484482
}
485483
this._subscribeTransceivers.set(internalId, {transceivers});
486-
this._subscribedStreams.set(internalId, stream);
487484

488485
let localDesc;
489486
this._pc.createOffer(offerOptions).then((desc) => {
@@ -510,11 +507,11 @@ export class ConferencePeerConnectionChannel extends EventDispatcher {
510507
});
511508
return this._signaling.sendSignalingMessage('subscribe', {
512509
media: {tracks: trackOptions},
513-
transport: {id: this._id, type: 'webrtc'},
510+
transport: {id: this._transportId, type: 'webrtc'},
514511
}).catch((e) => {
515512
// Send SDP even when failed to get Answer.
516513
this._signaling.sendSignalingMessage('soac', {
517-
id: this._id,
514+
id: this._transportId,
518515
signaling: localDesc,
519516
});
520517
throw e;
@@ -529,10 +526,10 @@ export class ConferencePeerConnectionChannel extends EventDispatcher {
529526

530527
this._subscribeTransceivers.get(internalId).id = subscriptionId;
531528
this._reverseIdMap.set(subscriptionId, internalId);
532-
if (this._id && this._id !== data.transportId) {
529+
if (this._transportId && this._transportId !== data.transportId) {
533530
Logger.warning('Server returns conflict ID: ' + data.transportId);
534531
}
535-
this._id = data.transportId;
532+
this._transportId = data.transportId;
536533

537534
// Modify local SDP before sending
538535
if (options) {
@@ -542,7 +539,7 @@ export class ConferencePeerConnectionChannel extends EventDispatcher {
542539
});
543540
}
544541
this._signaling.sendSignalingMessage('soac', {
545-
id: this._id,
542+
id: this._transportId,
546543
signaling: localDesc,
547544
});
548545
}).catch((e) => {
@@ -674,11 +671,6 @@ export class ConferencePeerConnectionChannel extends EventDispatcher {
674671
new ConferenceError('Failed to subscribe'));
675672
}
676673
}
677-
// Clear media stream
678-
if (this._subscribedStreams.has(internalId)) {
679-
this._subscribedStreams.get(internalId).mediaStream = null;
680-
this._subscribedStreams.delete(internalId);
681-
}
682674
if (this._sdpResolverMap.has(internalId)) {
683675
const resolver = this._sdpResolverMap.get(internalId);
684676
if (!resolver.finish) {
@@ -732,31 +724,20 @@ export class ConferencePeerConnectionChannel extends EventDispatcher {
732724

733725
_onRemoteStreamAdded(event) {
734726
Logger.debug('Remote stream added.');
735-
let find = false;
736727
for (const [internalId, sub] of this._subscribeTransceivers) {
737-
const subscriptionId = sub.id;
738728
if (sub.transceivers.find((t) => t.transceiver === event.transceiver)) {
739-
find = true;
740-
const subscribedStream = this._subscribedStreams.get(internalId);
741-
if (!subscribedStream.mediaStream) {
742-
this._subscribedStreams.get(internalId).mediaStream =
743-
event.streams[0];
744-
// Resolve subscription if ready handler has been called
745-
const subscription = this._subscriptions.get(subscriptionId);
746-
if (subscription) {
747-
this._subscribePromises.get(internalId).resolve(subscription);
748-
}
749-
} else {
750-
// Add track to the existing stream
751-
subscribedStream.mediaStream.addTrack(event.track);
729+
const subscription = this._subscription.get(internalId);
730+
subscription.stream = event.streams[0];
731+
if (this._subscribePromises.has(internalId)) {
732+
this._subscribePromises.get(internalId).resolve(subscription);
733+
this._subscribePromises.delete(internalId);
752734
}
735+
return;
753736
}
754737
}
755-
if (!find) {
756-
// This is not expected path. However, this is going to happen on Safari
757-
// because it does not support setting direction of transceiver.
758-
Logger.warning('Received remote stream without subscription.');
759-
}
738+
// This is not expected path. However, this is going to happen on Safari
739+
// because it does not support setting direction of transceiver.
740+
Logger.warning('Received remote stream without subscription.');
760741
}
761742

762743
_onLocalIceCandidate(event) {
@@ -840,7 +821,7 @@ export class ConferencePeerConnectionChannel extends EventDispatcher {
840821

841822
_sendCandidate(candidate) {
842823
this._signaling.sendSignalingMessage('soac', {
843-
id: this._id,
824+
id: this._transportId,
844825
signaling: {
845826
type: 'candidate',
846827
candidate: {
@@ -896,16 +877,10 @@ export class ConferencePeerConnectionChannel extends EventDispatcher {
896877
(trackKind) => this._muteOrUnmute(sessionId, false, false, trackKind),
897878
(options) => this._applyOptions(sessionId, options));
898879
this._subscriptions.set(sessionId, subscription);
899-
// Fire subscription's ended event when associated stream is ended.
900-
this._subscribedStreams.get(internalId).addEventListener('ended', () => {
901-
if (this._subscriptions.has(sessionId)) {
902-
this._subscriptions.get(sessionId).dispatchEvent(
903-
'ended', new OwtEvent('ended'));
904-
}
905-
});
906-
// Resolve subscription if mediaStream is ready
907-
if (this._subscribedStreams.get(internalId).mediaStream) {
880+
// Resolve subscription if mediaStream is ready.
881+
if (this._subscriptions.get(internalId).stream) {
908882
this._subscribePromises.get(internalId).resolve(subscription);
883+
this._subscribePromises.delete(internalId);
909884
}
910885
} else if (this._publishPromises.has(internalId)) {
911886
const publication = new Publication(sessionId, () => {
@@ -1056,21 +1031,23 @@ export class ConferencePeerConnectionChannel extends EventDispatcher {
10561031
return sdp;
10571032
}
10581033

1059-
// Handle stream event sent from MCU. Some stream events should be publication
1060-
// event or subscription event. It will be handled here.
1034+
// Handle stream event sent from MCU. Some stream update events sent from
1035+
// server, more specifically audio.status and video.status events should be
1036+
// publication event or subscription events. They don't change MediaStream's
1037+
// status. See
1038+
// https://github.com/open-webrtc-toolkit/owt-server/blob/master/doc/Client-Portal%20Protocol.md#339-participant-is-notified-on-streams-update-in-room
1039+
// for more information.
10611040
_onStreamEvent(message) {
10621041
const eventTargets = [];
10631042
if (this._publications.has(message.id)) {
10641043
eventTargets.push(this._publications.get(message.id));
10651044
}
1066-
for (const [internalId, subscribedStream] of this._subscribedStreams) {
1067-
if (message.id === subscribedStream.id) {
1068-
const subscriptionId = this._subscribeTransceivers.get(internalId).id;
1069-
eventTargets.push(this._subscriptions.get(subscriptionId));
1070-
break;
1045+
for (const subscription of this._subscriptions) {
1046+
if (message.id === subscription._audioTrackId ||
1047+
message.id === subscription._videoTrackId) {
1048+
eventTargets.push(subscription);
10711049
}
10721050
}
1073-
10741051
if (!eventTargets.length) {
10751052
return;
10761053
}
@@ -1100,9 +1077,9 @@ export class ConferencePeerConnectionChannel extends EventDispatcher {
11001077
// Only check the first one.
11011078
const param = obj[0];
11021079
return !!(
1103-
param.codecPayloadType || param.dtx || param.active || param.ptime ||
1104-
param.maxFramerate || param.scaleResolutionDownBy || param.rid ||
1105-
param.scalabilityMode);
1080+
param.codecPayloadType || param.dtx || param.active || param.ptime ||
1081+
param.maxFramerate || param.scaleResolutionDownBy || param.rid ||
1082+
param.scalabilityMode);
11061083
}
11071084

11081085
_isOwtEncodingParameters(obj) {

src/sdk/conference/subscription.js

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ export class SubscriptionUpdateOptions {
270270
*/
271271
export class Subscription extends EventDispatcher {
272272
// eslint-disable-next-line require-jsdoc
273-
constructor(id, stop, getStats, mute, unmute, applyOptions) {
273+
constructor(id, stream, stop, getStats, mute, unmute, applyOptions) {
274274
super();
275275
if (!id) {
276276
throw new TypeError('ID cannot be null or undefined.');
@@ -285,6 +285,16 @@ export class Subscription extends EventDispatcher {
285285
writable: false,
286286
value: id,
287287
});
288+
/**
289+
* @member {MediaStream | BidirectionalStream} stream
290+
* @instance
291+
* @memberof Owt.Conference.Subscription
292+
*/
293+
Object.defineProperty(this, 'stream', {
294+
configurable: false,
295+
writable: false,
296+
value: stream,
297+
});
288298
/**
289299
* @function stop
290300
* @instance
@@ -328,5 +338,10 @@ export class Subscription extends EventDispatcher {
328338
* @returns {Promise<undefined, Error>}
329339
*/
330340
this.applyOptions = applyOptions;
341+
342+
// Track is not defined in server protocol. So these IDs are equal to their
343+
// stream's ID at this time.
344+
this._audioTrackId = undefined;
345+
this._videoTrackId = undefined;
331346
}
332347
}

0 commit comments

Comments
 (0)