Skip to content

Commit 92e4eff

Browse files
committed
wip webtransport support
1 parent b026315 commit 92e4eff

File tree

4 files changed

+281
-3
lines changed

4 files changed

+281
-3
lines changed

src/lib/libsockfs.js

Lines changed: 271 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ addToLibrary({
88
$SOCKFS__postset: () => {
99
addAtInit('SOCKFS.root = FS.mount(SOCKFS, {}, null);');
1010
},
11-
$SOCKFS__deps: ['$FS'],
11+
$SOCKFS__deps: ['$FS', '$DNS'],
1212
$SOCKFS: {
1313
#if expectToReceiveOnModule('websocket')
1414
websocketArgs: {},
@@ -69,6 +69,8 @@ addToLibrary({
6969
pending: [],
7070
recv_queue: [],
7171
#if SOCKET_WEBRTC
72+
#elif SOCKET_WEBTRANSPORT
73+
sock_ops: SOCKFS.webtransport_sock_ops
7274
#else
7375
sock_ops: SOCKFS.websocket_sock_ops
7476
#endif
@@ -138,6 +140,273 @@ addToLibrary({
138140
return `socket[${SOCKFS.nextname.current++}]`;
139141
},
140142
// backend-specific stream ops
143+
#if SOCKET_WEBRTC
144+
#elif SOCKET_WEBTRANSPORT
145+
webtransport_sock_ops: {
146+
getSession(sock, addr, port) {
147+
return sock.peers[`${addr}:${port}`];
148+
},
149+
initSession(sock, session, addr, port) {
150+
sock.peers[`${addr}:${port}`] = session;
151+
152+
/* buffer writes before session is ready */
153+
const outgoing = [];
154+
155+
session.write = (buffer) => {
156+
outgoing.push(buffer);
157+
};
158+
159+
/* prevent unhandled rejections before main loop */
160+
session.ready.catch(() => {});
161+
session.closed.catch(() => {});
162+
163+
(async () => {
164+
try {
165+
await session.ready;
166+
167+
const writer = session.datagrams.writable.getWriter();
168+
let first = true;
169+
170+
while (outgoing.length) {
171+
writer.write(outgoing.shift()).catch(e => {});
172+
}
173+
174+
session.write = (buffer) => {
175+
writer.write(buffer).catch(e => {});
176+
};
177+
178+
for await (const packet of session.datagrams.readable) {
179+
// handle the internal port identification message
180+
if (first && packet[0] === 0xff && packet[1] === 0xff && packet[2] === 0xff && packet[3] === 0xff &&
181+
packet[4] === 'p' && packet[5] === 'o' && packet[6] === 'r' && packet[7] === 't') {
182+
// update cache key
183+
delete sock.peers[`${addr}:${port}`];
184+
port = parseInt(String.fromCharCode.apply(null, packet.subarray(9)), 10);
185+
sock.peers[`${addr}:${port}`] = session;
186+
} else {
187+
sock.recv_queue.push({ addr: addr, port: port, buffer: packet });
188+
}
189+
190+
if (sock.pendingPollResolve) {
191+
sock.pendingPollResolve();
192+
}
193+
194+
first = false;
195+
}
196+
} catch (e) {
197+
console.error(`Session ${addr}:${port} terminated`, e);
198+
} finally {
199+
console.log(`Removing peer ${addr}:${port}`);
200+
delete sock.peers[`${addr}:${port}`];
201+
}
202+
})();
203+
},
204+
newSession(sock, addr, port) {
205+
let hostname = DNS.lookup_addr(addr);
206+
207+
if (!hostname) {
208+
hostname = addr;
209+
}
210+
211+
const session = new WebTransport(`https://${hostname}:${port}`);
212+
213+
console.log(`New session https://${hostname}:${port}`);
214+
215+
SOCKFS.webtransport_sock_ops.initSession(sock, session, addr, port);
216+
217+
// send the original bound port number to the peer
218+
if (sock.type === {{{ cDefs.SOCK_DGRAM }}} && typeof sock.sport != 'undefined') {
219+
const msg = Uint8Array.from(`\xff\xff\xff\xffport ${sock.sport}\x00`, x => x.charCodeAt(0));
220+
session.write(msg);
221+
}
222+
223+
return session;
224+
},
225+
acceptSession(sock, session) {
226+
#if ENVIRONMENT_MAY_BE_NODE
227+
const split = session.peerAddress.split(':');
228+
229+
const addr = split[0];
230+
const port = parseInt(split[1], 10);
231+
232+
console.log(`Accept session ${addr}:${port}`);
233+
234+
SOCKFS.webtransport_sock_ops.initSession(sock, session, addr, port);
235+
#endif
236+
},
237+
stopListenServer(sock) {
238+
#if ENVIRONMENT_MAY_BE_NODE
239+
if (!ENVIRONMENT_IS_NODE) {
240+
return;
241+
}
242+
243+
if (!sock.h3) {
244+
return;
245+
}
246+
247+
sock.h3.stopServer();
248+
sock.h3 = null;
249+
#endif
250+
},
251+
startListenServer(sock) {
252+
#if ENVIRONMENT_MAY_BE_NODE
253+
if (!ENVIRONMENT_IS_NODE) {
254+
return;
255+
}
256+
257+
SOCKFS.webtransport_sock_ops.stopListenServer(sock);
258+
259+
sock.h3 = new Http3Server({
260+
host: sock.saddr,
261+
port: sock.sport,
262+
secret: require('crypto').randomBytes(16).toString('hex'),
263+
cert: Module['cert'],
264+
privKey: Module['key']
265+
});
266+
267+
(async () => {
268+
try {
269+
sock.h3.startServer();
270+
271+
await sock.h3.ready;
272+
273+
const stream = await sock.h3.sessionStream('/');
274+
275+
console.log(`Listening on ${sock.h3.host}:${sock.h3.port}`);
276+
277+
for await (const session of stream) {
278+
SOCKFS.webtransport_sock_ops.acceptSession(sock, session);
279+
}
280+
} catch (e) {
281+
sock.error = {{{ cDefs.EHOSTUNREACH }}};
282+
} finally {
283+
sock.h3 = null;
284+
}
285+
})();
286+
#endif
287+
},
288+
289+
// actual sock ops
290+
#if ASYNCIFY
291+
async poll(sock, timeout)
292+
#else
293+
poll(sock, timeout)
294+
#endif
295+
{
296+
let mask = 0;
297+
298+
if (sock.type === {{{ cDefs.SOCK_STREAM }}}) {
299+
throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}});
300+
} else {
301+
#if ASYNCIFY
302+
if (!sock.recv_queue.length) {
303+
await new Promise((resolve, reject) => {
304+
sock.pendingPromiseResolve = resolve;
305+
setTimeout(resolve, timeout);
306+
}).finally(() => {
307+
sock.pendingPromiseResolve = null;
308+
});
309+
}
310+
#endif
311+
312+
if (sock.recv_queue.length) {
313+
mask |= {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}};
314+
}
315+
316+
/* always ready to write */
317+
mask |= {{{ cDefs.POLLOUT }}};
318+
}
319+
320+
return mask;
321+
},
322+
ioctl(sock, request, arg) {
323+
switch (request) {
324+
default:
325+
return {{{ cDefs.EINVAL }}};
326+
}
327+
},
328+
close(sock) {
329+
for (const session of Object.values(sock.peers)) {
330+
session.close();
331+
}
332+
333+
SOCKFS.webtransport_sock_ops.stopListenServer(sock);
334+
335+
return 0;
336+
},
337+
bind(sock, addr, port) {
338+
if (typeof sock.saddr !== 'undefined' || typeof sock.sport !== 'undefined') {
339+
throw new FS.ErrnoError({{{ cDefs.EINVAL }}}); // already bound
340+
}
341+
342+
sock.saddr = addr;
343+
sock.sport = port;
344+
345+
if (sock.type === {{{ cDefs.SOCK_DGRAM }}}) {
346+
SOCKFS.webtransport_sock_ops.startListenServer(sock);
347+
} else {
348+
throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}});
349+
}
350+
},
351+
connect(sock, addr, port) {
352+
if (sock.h3) {
353+
throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}});
354+
}
355+
356+
if (sock.type === {{{ cDefs.SOCK_STREAM }}}) {
357+
throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}});
358+
} else {
359+
sock.daddr = addr;
360+
sock.dport = port;
361+
}
362+
},
363+
listen(sock, backlog) {
364+
throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}});
365+
},
366+
sendmsg(sock, buffer, offset, length, addr, port) {
367+
let session = null;
368+
369+
if (sock.type === {{{ cDefs.SOCK_STREAM }}}) {
370+
throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}});
371+
} else {
372+
if (addr === undefined || port === undefined) {
373+
addr = sock.daddr;
374+
port = sock.dport;
375+
}
376+
377+
session = SOCKFS.webtransport_sock_ops.getSession(sock, addr, port);
378+
379+
if (!session) {
380+
session = SOCKFS.webtransport_sock_ops.newSession(sock, addr, port);
381+
}
382+
}
383+
384+
if (!session) {
385+
throw new FS.ErrnoError({{{ cDefs.EDESTADDRREQ }}});
386+
}
387+
388+
// copy off the buffer because write is async
389+
buffer = buffer.slice(offset, offset + length);
390+
391+
session.write(buffer);
392+
393+
return length;
394+
},
395+
recvmsg(sock, length) {
396+
if (sock.type === {{{ cDefs.SOCK_STREAM }}}) {
397+
throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}});
398+
}
399+
400+
const msg = sock.recv_queue.shift();
401+
402+
if (!msg) {
403+
throw new FS.ErrnoError({{{ cDefs.EAGAIN }}});
404+
}
405+
406+
return msg;
407+
},
408+
},
409+
#else
141410
websocket_sock_ops: {
142411
//
143412
// peers are a small wrapper around a WebSocket to help in
@@ -728,6 +997,7 @@ addToLibrary({
728997
return res;
729998
}
730999
}
1000+
#endif
7311001
},
7321002

7331003
/*

src/lib/libsyscall.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -484,11 +484,11 @@ var SyscallsLibrary = {
484484
var sock = getSocketFromFD(fd);
485485
if (!addr) {
486486
// send, no address provided
487-
return FS.write(sock.stream, HEAP8, message, length);
487+
return FS.write(sock.stream, HEAPU8, message, length);
488488
}
489489
var dest = getSocketAddress(addr, addr_len);
490490
// sendto an address
491-
return sock.sock_ops.sendmsg(sock, HEAP8, message, length, dest.addr, dest.port);
491+
return sock.sock_ops.sendmsg(sock, HEAPU8, message, length, dest.addr, dest.port);
492492
},
493493
__syscall_getsockopt__deps: ['$getSocketFromFD'],
494494
__syscall_getsockopt: (fd, level, optname, optval, optlen, d1) => {

src/modularize.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@
1111
import source wasmModule from './{{{ WASM_BINARY_FILE }}}';
1212
#endif
1313

14+
#if SOCKET_WEBTRANSPORT && ENVIRONMENT_MAY_BE_NODE
15+
#if EXPORT_ES6
16+
import { Http3Server, WebTransport } from '@fails-components/webtransport';
17+
#endif
18+
#endif
19+
1420
#if ENVIRONMENT_MAY_BE_WEB && !EXPORT_ES6 && !(MINIMAL_RUNTIME && !PTHREADS)
1521
// Single threaded MINIMAL_RUNTIME programs do not need access to
1622
// document.currentScript, so a simple export declaration is enough.

src/settings.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,8 @@ var FS_DEBUG = false;
402402
// [link]
403403
var SOCKET_WEBRTC = false;
404404

405+
var SOCKET_WEBTRANSPORT = false;
406+
405407
// A string containing either a WebSocket URL prefix (ws:// or wss://) or a complete
406408
// RFC 6455 URL - "ws[s]:" "//" host [ ":" port ] path [ "?" query ].
407409
// In the (default) case of only a prefix being specified the URL will be constructed from

0 commit comments

Comments
 (0)