Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions packages/transport-webrtc/src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,19 @@ export interface WebRTCStreamInit extends AbstractStreamInit {
}

// Max message size that can be sent to the DataChannel
const MAX_MESSAGE_SIZE = 16 * 1024
export const MAX_MESSAGE_SIZE = 16 * 1024

// How much can be buffered to the DataChannel at once
const MAX_BUFFERED_AMOUNT = 16 * 1024 * 1024
export const MAX_BUFFERED_AMOUNT = 16 * 1024 * 1024

// How long time we wait for the 'bufferedamountlow' event to be emitted
const BUFFERED_AMOUNT_LOW_TIMEOUT = 30 * 1000
export const BUFFERED_AMOUNT_LOW_TIMEOUT = 30 * 1000

// protobuf field definition overhead
const PROTOBUF_OVERHEAD = 3
export const PROTOBUF_OVERHEAD = 5

// Length of varint, in bytes.
export const VARINT_LENGTH = 2

export class WebRTCStream extends AbstractStream {
/**
Expand All @@ -58,6 +61,10 @@ export class WebRTCStream extends AbstractStream {
private readonly incomingData: Pushable<Uint8Array>

private messageQueue?: Uint8ArrayList

/**
* The maximum size of a message in bytes
*/
private readonly maxDataSize: number

constructor (init: WebRTCStreamInit) {
Expand All @@ -70,7 +77,7 @@ export class WebRTCStream extends AbstractStream {
this.dataChannelOptions = {
bufferedAmountLowEventTimeout: init.dataChannelOptions?.bufferedAmountLowEventTimeout ?? BUFFERED_AMOUNT_LOW_TIMEOUT,
maxBufferedAmount: init.dataChannelOptions?.maxBufferedAmount ?? MAX_BUFFERED_AMOUNT,
maxMessageSize: init.dataChannelOptions?.maxMessageSize ?? MAX_MESSAGE_SIZE
maxMessageSize: init.dataChannelOptions?.maxMessageSize ?? init.maxDataSize
}
this.maxDataSize = init.maxDataSize

Expand Down Expand Up @@ -275,7 +282,7 @@ export function createStream (options: WebRTCStreamOptions): WebRTCStream {
return new WebRTCStream({
id: direction === 'inbound' ? (`i${channel.id}`) : `r${channel.id}`,
direction,
maxDataSize: (dataChannelOptions?.maxMessageSize ?? MAX_MESSAGE_SIZE) - PROTOBUF_OVERHEAD,
maxDataSize: (dataChannelOptions?.maxMessageSize ?? MAX_MESSAGE_SIZE) - PROTOBUF_OVERHEAD - VARINT_LENGTH,
dataChannelOptions,
onEnd,
channel,
Expand Down
18 changes: 7 additions & 11 deletions packages/transport-webrtc/test/stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,22 @@ import * as lengthPrefixed from 'it-length-prefixed'
import { pushable } from 'it-pushable'
import { Uint8ArrayList } from 'uint8arraylist'
import { Message } from '../src/pb/message.js'
import { createStream } from '../src/stream.js'
import { MAX_BUFFERED_AMOUNT, MAX_MESSAGE_SIZE, PROTOBUF_OVERHEAD, createStream } from '../src/stream.js'

const mockDataChannel = (opts: { send: (bytes: Uint8Array) => void, bufferedAmount?: number }): RTCDataChannel => {
return {
readyState: 'open',
close: () => {},
addEventListener: (_type: string, _listener: () => void) => {},
removeEventListener: (_type: string, _listener: () => void) => {},
close: () => { },
addEventListener: (_type: string, _listener: () => void) => { },
removeEventListener: (_type: string, _listener: () => void) => { },
...opts
} as RTCDataChannel
}

const MAX_MESSAGE_SIZE = 16 * 1024

describe('Max message size', () => {
it(`sends messages smaller or equal to ${MAX_MESSAGE_SIZE} bytes in one`, async () => {
const sent: Uint8ArrayList = new Uint8ArrayList()
const data = new Uint8Array(MAX_MESSAGE_SIZE - 5)
const data = new Uint8Array(MAX_MESSAGE_SIZE - PROTOBUF_OVERHEAD)
const p = pushable()

// Make sure that the data that ought to be sent will result in a message with exactly MAX_MESSAGE_SIZE
Expand All @@ -42,8 +40,7 @@ describe('Max message size', () => {
p.end()
await webrtcStream.sink(p)

// length(message) + message + length(FIN) + FIN
expect(length(sent)).to.equal(4)
expect(length(sent)).to.equal(6)

for (const buf of sent) {
expect(buf.byteLength).to.be.lessThanOrEqual(MAX_MESSAGE_SIZE)
Expand Down Expand Up @@ -80,7 +77,6 @@ describe('Max message size', () => {
})

it('closes the stream if bufferamountlow timeout', async () => {
const MAX_BUFFERED_AMOUNT = 16 * 1024 * 1024 + 1
const timeout = 100
let closed = false
const webrtcStream = createStream({
Expand All @@ -91,7 +87,7 @@ describe('Max message size', () => {
send: () => {
throw new Error('Expected to not send')
},
bufferedAmount: MAX_BUFFERED_AMOUNT
bufferedAmount: MAX_BUFFERED_AMOUNT + 1
}),
direction: 'outbound',
onEnd: () => {
Expand Down