Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changes/dc-reliability
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
patch type="changed" "Improve reliable data channel buffering, sequencing, and add integration tests"
174 changes: 159 additions & 15 deletions lib/src/core/engine.dart
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import '../track/local/local.dart';
import '../track/local/video.dart';
import '../types/internal.dart';
import '../types/other.dart';
import '../utils/data_packet_buffer.dart';
import '../utils/ttl_map.dart';
import 'signal_client.dart';
import 'transport.dart';

Expand Down Expand Up @@ -154,6 +156,24 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
_e2eeManager = e2eeManager;
}

// E2E reliability for data channels
int _reliableDataSequence = 1;
final DataPacketBuffer _reliableMessageBuffer = DataPacketBuffer(
maxBufferSize: 64 * 1024 * 1024, // 64MB
maxPacketCount: 1000, // max 1000 packets
);
final TTLMap<String, int> _reliableReceivedState = TTLMap<String, int>(30000);
bool _isReconnecting = false;

String? _reliableParticipantKey(lk_models.DataPacket packet) {
if (packet.hasParticipantSid() && packet.participantSid.isNotEmpty) {
return packet.participantSid;
}
logger.fine(
'Reliable packet missing participant SID (identity: ${packet.participantIdentity}), skipping dedupe handling');
return null;
}

void clearReconnectTimeout() {
if (reconnectTimeout != null) {
reconnectTimeout?.cancel();
Expand Down Expand Up @@ -188,6 +208,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
await cleanUp();
await events.dispose();
await _signalListener.dispose();
_reliableReceivedState.dispose();
});
}

Expand Down Expand Up @@ -265,6 +286,12 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
fullReconnectOnNext = false;
attemptingReconnect = false;

// Reset reliability state
_reliableDataSequence = 1;
_reliableMessageBuffer.clear();
_reliableReceivedState.clear();
_isReconnecting = false;

clearPendingReconnect();
}

Expand Down Expand Up @@ -333,34 +360,67 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
return completer.future;
}

Future<void> _resendReliableMessagesForResume(int lastMessageSeq) async {
logger.fine('Resending reliable messages from sequence $lastMessageSeq');

final channel = _publisherDataChannel(Reliability.reliable);
if (channel == null) {
logger.warning('Reliable data channel is null, cannot resend messages');
return;
}

// Remove acknowledged messages from buffer
_reliableMessageBuffer.popToSequence(lastMessageSeq);

// Get remaining messages to resend
final messagesToResend = _reliableMessageBuffer.getAll();

if (messagesToResend.isEmpty) {
logger.fine('No reliable messages to resend');
return;
}

logger.fine('Resending ${messagesToResend.length} reliable messages');

for (final item in messagesToResend) {
try {
await channel.send(item.message);
logger.fine('Resent reliable message with sequence ${item.sequence}');
} catch (e) {
logger.warning('Failed to resend reliable message ${item.sequence}: $e');
}
}
}

@internal
Future<void> sendDataPacket(
lk_models.DataPacket packet, {
bool? reliability = true,
Reliability reliability = Reliability.lossy,
}) async {
// Add sequence number for reliable packets
if (reliability == Reliability.reliable) {
packet.sequence = _reliableDataSequence++;
}

// construct the data channel message
var message = rtc.RTCDataChannelMessage.fromBinary(packet.writeToBuffer());

final reliabilityType = reliability == true ? Reliability.reliable : Reliability.lossy;

if (_subscriberPrimary) {
// make sure publisher transport is connected

await _publisherEnsureConnected();

// wait for data channel to open (if not already)
if (_publisherDataChannelState(reliabilityType) != rtc.RTCDataChannelState.RTCDataChannelOpen) {
logger.fine('Waiting for data channel ${reliabilityType} to open...');
if (_publisherDataChannelState(reliability) != rtc.RTCDataChannelState.RTCDataChannelOpen) {
logger.fine('Waiting for data channel ${reliability} to open...');
await events.waitFor<PublisherDataChannelStateUpdatedEvent>(
filter: (event) => event.type == reliabilityType,
filter: (event) => event.type == reliability,
duration: connectOptions.timeouts.connection,
);
}
}

// chose data channel
final rtc.RTCDataChannel? channel =
_publisherDataChannel(reliability == true ? Reliability.reliable : Reliability.lossy);
final rtc.RTCDataChannel? channel = _publisherDataChannel(reliability);

if (channel == null) {
throw UnexpectedStateException('Data channel for ${packet.kind.toSDKType()} is null');
Expand Down Expand Up @@ -388,16 +448,38 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
kind: packet.kind,
encryptedPacket: encryptedPacket,
destinationIdentities: packet.destinationIdentities,
sequence: packet.hasSequence() ? packet.sequence : null,
participantSid: packet.hasParticipantSid() ? packet.participantSid : null,
);

message = rtc.RTCDataChannelMessage.fromBinary(dataToSend.writeToBuffer());
}
}

logger.fine('sendDataPacket(label:${channel.label})');
// Buffer reliable packets for potential resending
if (reliability == Reliability.reliable) {
_reliableMessageBuffer.push(BufferedDataPacket(
packet: packet,
message: message,
sequence: packet.sequence,
));
}

// Don't send during reconnection, but keep message buffered for resending
if (_isReconnecting) {
logger.fine('Deferring data packet send during reconnection (will resend when resumed)');
return;
}

logger.fine('sendDataPacket(label:${channel.label}, sequence:${packet.sequence})');
await channel.send(message);

_dcBufferStatus[reliabilityType] = await channel.getBufferedAmount() <= channel.bufferedAmountLowThreshold!;
_dcBufferStatus[reliability] = await channel.getBufferedAmount() <= channel.bufferedAmountLowThreshold!;

// Align buffer with WebRTC buffer for reliable packets
if (reliability == Reliability.reliable) {
_reliableMessageBuffer.alignBufferedAmount(await channel.getBufferedAmount());
}
}

Future<void> _publisherEnsureConnected() async {
Expand Down Expand Up @@ -629,7 +711,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
type: Reliability.lossy,
)));
// _onDCStateUpdated(Reliability.lossy, state)
_lossyDCPub?.bufferedAmountLowThreshold = 65535;
_lossyDCPub?.bufferedAmountLowThreshold = 2 * 1024 * 1024;
_lossyDCPub?.onBufferedAmountLow = (_) {
_dcBufferStatus[Reliability.lossy] = (_lossyDCPub!.bufferedAmount! <= _lossyDCPub!.bufferedAmountLowThreshold!);
};
Expand All @@ -648,7 +730,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
state: state,
type: Reliability.reliable,
)));
_reliableDCPub?.bufferedAmountLowThreshold = 65535;
_reliableDCPub?.bufferedAmountLowThreshold = 2 * 1024 * 1024;
_reliableDCPub?.onBufferedAmountLow = (_) {
_dcBufferStatus[Reliability.reliable] =
(_reliableDCPub!.bufferedAmount! <= _reliableDCPub!.bufferedAmountLowThreshold!);
Expand Down Expand Up @@ -722,12 +804,51 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
logger.warning('Failed to decrypt data packet');
return;
}

final decryptedPacketPayload = lk_models.EncryptedPacketPayload.fromBuffer(decryptedData);
final newDp = asDataPacket(decryptedPacketPayload);

// Handle sequence numbers for reliable packets (encrypted outer packet)
if (dp.kind == lk_models.DataPacket_Kind.RELIABLE && dp.hasSequence()) {
final participantKey = _reliableParticipantKey(dp);
if (participantKey != null) {
final sequence = dp.sequence;
final lastReceived = _reliableReceivedState.get(participantKey) ?? 0;
if (sequence <= lastReceived) {
logger.fine('Ignoring duplicate or out-of-order packet: '
'sequence=$sequence, lastReceived=$lastReceived, participantSid=$participantKey');
return;
}
_reliableReceivedState.set(participantKey, sequence);
}
}

// Preserve metadata from the outer packet on the decrypted packet
newDp
..kind = dp.kind
..sequence = dp.sequence
..participantIdentity = dp.participantIdentity
..participantSid = dp.participantSid
..destinationIdentities.addAll(dp.destinationIdentities);

_emitDataPacket(newDp, encryptionType: dp.encryptedPacket.encryptionType.toLkType());
} else {
// Handle sequence numbers for reliable packets (plaintext)
if (dp.kind == lk_models.DataPacket_Kind.RELIABLE && dp.hasSequence()) {
final participantKey = _reliableParticipantKey(dp);
if (participantKey != null) {
final sequence = dp.sequence;
final lastReceived = _reliableReceivedState.get(participantKey) ?? 0;
if (sequence <= lastReceived) {
logger.fine('Ignoring duplicate or out-of-order packet: '
'sequence=$sequence, lastReceived=$lastReceived, participantSid=$participantKey');
return;
}
_reliableReceivedState.set(participantKey, sequence);
} else {
logger.fine('Reliable packet without participant SID, skipping dedupe');
}
}

_emitDataPacket(dp);
}
}
Expand Down Expand Up @@ -815,6 +936,8 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {

logger.info('onDisconnected state:${connectionState} reason:${reason.name}');

_isReconnecting = true;

if (reconnectAttempts == 0) {
reconnectStart = DateTime.timestamp();
}
Expand Down Expand Up @@ -891,6 +1014,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
}
clearPendingReconnect();
attemptingReconnect = false;
_isReconnecting = false;
} catch (e) {
reconnectAttempts = reconnectAttempts! + 1;
bool recoverable = true;
Expand Down Expand Up @@ -964,6 +1088,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
logger.fine('resumeConnection: primary connected');
}

_isReconnecting = false;
events.emit(const EngineResumedEvent());
}

Expand Down Expand Up @@ -1030,12 +1155,25 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
required List<String> trackSidsDisabled,
}) async {
final previousAnswer = (await subscriber?.pc.getLocalDescription())?.toPBType();

// Build data channel receive states for reliability
final dataChannelReceiveStates = <lk_rtc.DataChannelReceiveState>[];
for (final participantId in List.of(_reliableReceivedState.keys)) {
final lastSequence = _reliableReceivedState.get(participantId);
if (lastSequence != null) {
final receiveState = lk_rtc.DataChannelReceiveState();
receiveState.publisherSid = participantId;
receiveState.lastSeq = lastSequence;
dataChannelReceiveStates.add(receiveState);
}
}
signalClient.sendSyncState(
answer: previousAnswer,
subscription: subscription,
publishTracks: publishTracks,
dataChannelInfo: dataChannelInfo(),
trackSidsDisabled: trackSidsDisabled,
dataChannelReceiveStates: dataChannelReceiveStates,
);
}

Expand Down Expand Up @@ -1092,7 +1230,8 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {

logger.fine('Handle ReconnectResponse: '
'iceServers: ${event.response.iceServers}, '
'forceRelay: $event.response.clientConfiguration.forceRelay');
'forceRelay: $event.response.clientConfiguration.forceRelay, '
'lastMessageSeq: ${event.response.lastMessageSeq}');

final rtcConfiguration = await _buildRtcConfiguration(
serverResponseForceRelay: event.response.clientConfiguration.forceRelay,
Expand All @@ -1105,6 +1244,11 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
await negotiate();
}

// Handle reliable message resending
if (event.response.hasLastMessageSeq()) {
await _resendReliableMessagesForResume(event.response.lastMessageSeq);
}

events.emit(const SignalReconnectedEvent());
})
..on<SignalConnectedEvent>((event) async {
Expand Down
2 changes: 2 additions & 0 deletions lib/src/core/signal_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ extension SignalClientRequests on SignalClient {
required Iterable<lk_rtc.TrackPublishedResponse>? publishTracks,
required Iterable<lk_rtc.DataChannelInfo>? dataChannelInfo,
required List<String> trackSidsDisabled,
List<lk_rtc.DataChannelReceiveState>? dataChannelReceiveStates,
}) =>
_sendRequest(lk_rtc.SignalRequest(
syncState: lk_rtc.SyncState(
Expand All @@ -474,6 +475,7 @@ extension SignalClientRequests on SignalClient {
publishTracks: publishTracks,
dataChannels: dataChannelInfo,
trackSidsDisabled: trackSidsDisabled,
datachannelReceiveStates: dataChannelReceiveStates,
),
));

Expand Down
6 changes: 4 additions & 2 deletions lib/src/data_stream/stream_writer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,11 @@ class WritableStream<T> implements StreamWriter<T> {
streamId: streamId,
);
final trailerPacket = lk_models.DataPacket(
kind: lk_models.DataPacket_Kind.RELIABLE,
destinationIdentities: destinationIdentities,
streamTrailer: trailer,
);
await engine.sendDataPacket(trailerPacket, reliability: true);
await engine.sendDataPacket(trailerPacket, reliability: Reliability.reliable);
}

@override
Expand All @@ -73,10 +74,11 @@ class WritableStream<T> implements StreamWriter<T> {
chunkIndex: Int64(chunkId),
);
final chunkPacket = lk_models.DataPacket(
kind: lk_models.DataPacket_Kind.RELIABLE,
destinationIdentities: destinationIdentities,
streamChunk: chunk,
);
await engine.sendDataPacket(chunkPacket, reliability: true);
await engine.sendDataPacket(chunkPacket, reliability: Reliability.reliable);
chunkId += 1;
}
}
Expand Down
Loading