Skip to content

Commit e3d9703

Browse files
authored
feat(miniflare): support node bindings over dev registry (#10142)
1 parent d481901 commit e3d9703

File tree

6 files changed

+824
-344
lines changed

6 files changed

+824
-344
lines changed

.changeset/nine-moose-decide.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
"miniflare": patch
3+
---
4+
5+
fix: support `mf.getBindings()` when dev registry is enabled
6+
7+
Fixes a deadlock when using bindings from `mf.getBindings()` with the dev registry enabled. The deadlock happened because the runtime attempted to resolve a worker address via the loopback server, which was blocked by the Node.js thread waiting on the same runtime.
8+
9+
Address lookup has been moved to a proxy running in a worker thread to avoid blocking the main thread.

packages/miniflare/scripts/build.mjs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,14 @@ async function buildPackage() {
153153
const pkg = getPackage(pkgRoot);
154154

155155
const indexPath = path.join(pkgRoot, "src", "index.ts");
156+
// The dev registry proxy runs in a Node.js worker thread (instead of workerd) and
157+
// requires a separate entry file
158+
const devRegistryProxyPath = path.join(
159+
pkgRoot,
160+
"src",
161+
"shared",
162+
"dev-registry.worker.ts"
163+
);
156164
// Look for test files ending with .spec.ts in the test directory, default to
157165
// empty array if not found
158166
let testPaths = [];
@@ -190,7 +198,7 @@ async function buildPackage() {
190198
logLevel: watch ? "info" : "warning",
191199
outdir: outPath,
192200
outbase: pkgRoot,
193-
entryPoints: [indexPath, ...testPaths],
201+
entryPoints: [indexPath, devRegistryProxyPath, ...testPaths],
194202
};
195203
if (watch) {
196204
const ctx = await esbuild.context(buildOptions);

packages/miniflare/src/index.ts

Lines changed: 21 additions & 202 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,6 @@ import {
111111
createInboundDoProxyService,
112112
createOutboundDoProxyService,
113113
createProxyFallbackService,
114-
extractDoFetchProxyTarget,
115-
extractServiceFetchProxyTarget,
116114
getHttpProxyOptions,
117115
getOutboundDoProxyClassName,
118116
getProtocol,
@@ -397,13 +395,12 @@ function getDurableObjectClassNames(
397395

398396
/**
399397
* This collects all external service bindings from all workers and overrides
400-
* it to point to a proxy in the loopback server. A fallback service will be created
398+
* it to point to the dev registry proxy. A fallback service will be created
401399
* for each of the external service in case the external service is not available.
402400
*/
403401
function getExternalServiceEntrypoints(
404402
allWorkerOpts: PluginWorkerOptions[],
405-
loopbackHost: string,
406-
loopbackPort: number
403+
proxyAddress: string
407404
) {
408405
const externalServices = new Map<
409406
string,
@@ -447,7 +444,7 @@ function getExternalServiceEntrypoints(
447444
// Override it to connect to the dev registry proxy
448445
workerOpts.core.serviceBindings[name] = {
449446
external: {
450-
address: `${loopbackHost}:${loopbackPort}`,
447+
address: proxyAddress,
451448
http: getHttpProxyOptions(serviceName, entrypoint),
452449
},
453450
};
@@ -516,7 +513,7 @@ function getExternalServiceEntrypoints(
516513
// Override it to connect to the dev registry proxy
517514
workerOpts.core.tails[i] = {
518515
external: {
519-
address: `${loopbackHost}:${loopbackPort}`,
516+
address: proxyAddress,
520517
http: getHttpProxyOptions(serviceName, entrypoint),
521518
},
522519
};
@@ -1096,81 +1093,10 @@ export class Miniflare {
10961093
return this.#workerOpts.map<NameSourceOptions>(({ core }) => core);
10971094
}
10981095

1099-
#getFallbackServiceAddress(
1100-
service: string,
1101-
entrypoint: string
1102-
): {
1103-
httpStyle: "host" | "proxy";
1104-
protocol: "http" | "https";
1105-
host: string;
1106-
port: number;
1107-
} {
1108-
assert(
1109-
this.#socketPorts !== undefined && this.#runtimeEntryURL !== undefined,
1110-
"Cannot resolve address for fallback service before runtime is initialised"
1111-
);
1112-
1113-
const port = this.#socketPorts.get(
1114-
getProxyFallbackServiceSocketName(service, entrypoint)
1115-
);
1116-
1117-
if (!port) {
1118-
throw new Error(
1119-
`There is no socket opened for "${service}" with the "${entrypoint}" entrypoint`
1120-
);
1121-
}
1122-
1123-
return {
1124-
httpStyle: "proxy",
1125-
protocol: getProtocol(this.#runtimeEntryURL),
1126-
host: this.#runtimeEntryURL.hostname,
1127-
port,
1128-
};
1129-
}
1130-
11311096
#handleLoopback = async (
11321097
req: http.IncomingMessage,
11331098
res?: http.ServerResponse
11341099
): Promise<Response | undefined> => {
1135-
const serviceProxyTarget = extractServiceFetchProxyTarget(req);
1136-
1137-
if (serviceProxyTarget) {
1138-
assert(res !== undefined, "No response object provided");
1139-
1140-
const address =
1141-
this.#devRegistry.getExternalServiceAddress(
1142-
serviceProxyTarget.service,
1143-
serviceProxyTarget.entrypoint
1144-
) ??
1145-
this.#getFallbackServiceAddress(
1146-
serviceProxyTarget.service,
1147-
serviceProxyTarget.entrypoint
1148-
);
1149-
1150-
this.#handleProxy(req, res, address);
1151-
return;
1152-
}
1153-
1154-
const doProxyTarget = extractDoFetchProxyTarget(req);
1155-
1156-
if (doProxyTarget) {
1157-
assert(res !== undefined, "No response object provided");
1158-
1159-
const address = this.#devRegistry.getExternalDurableObjectAddress(
1160-
doProxyTarget.scriptName,
1161-
doProxyTarget.className
1162-
);
1163-
1164-
if (!address) {
1165-
res.writeHead(503);
1166-
res.end("Service Unavailable");
1167-
return;
1168-
}
1169-
1170-
this.#handleProxy(req, res, address);
1171-
return;
1172-
}
1173-
11741100
const customNodeService =
11751101
req.headers[CoreHeaders.CUSTOM_NODE_SERVICE.toLowerCase()];
11761102
if (typeof customNodeService === "string") {
@@ -1363,109 +1289,6 @@ export class Miniflare {
13631289
await this.#writeResponse(response, res);
13641290
};
13651291

1366-
#handleLoopbackConnect = async (
1367-
req: http.IncomingMessage,
1368-
clientSocket: Duplex,
1369-
head: Buffer
1370-
) => {
1371-
try {
1372-
const connectHost = req.url;
1373-
const [serviceName, entrypoint] = connectHost?.split(":") ?? [];
1374-
const address =
1375-
this.#devRegistry.getExternalServiceAddress(serviceName, entrypoint) ??
1376-
this.#getFallbackServiceAddress(serviceName, entrypoint);
1377-
1378-
const serverSocket = net.connect(address.port, address.host, () => {
1379-
serverSocket.write(`CONNECT ${HOST_CAPNP_CONNECT} HTTP/1.1\r\n\r\n`);
1380-
1381-
// Push along any buffered bytes
1382-
if (head && head.length) {
1383-
serverSocket.write(head);
1384-
}
1385-
1386-
serverSocket.pipe(clientSocket);
1387-
clientSocket.pipe(serverSocket);
1388-
});
1389-
1390-
// Errors on either side
1391-
serverSocket.on("error", (err) => {
1392-
this.#log.error(err);
1393-
clientSocket.end();
1394-
});
1395-
clientSocket.on("error", () => serverSocket.end());
1396-
1397-
// Close the tunnel if the service is updated
1398-
// This make sure workerd will re-connect to the latest address
1399-
this.#devRegistry.subscribe(serviceName, () => {
1400-
this.#log.debug(
1401-
`Closing tunnel as service "${serviceName}" was updated`
1402-
);
1403-
clientSocket.end();
1404-
});
1405-
} catch (ex: any) {
1406-
this.#log.error(ex);
1407-
clientSocket.end();
1408-
}
1409-
};
1410-
1411-
#handleProxy = (
1412-
req: http.IncomingMessage,
1413-
res: http.ServerResponse,
1414-
target: {
1415-
protocol: "http" | "https";
1416-
host: string;
1417-
port: number;
1418-
httpStyle?: "host" | "proxy";
1419-
path?: string;
1420-
}
1421-
) => {
1422-
const headers = { ...req.headers };
1423-
let path = target.path;
1424-
1425-
if (!path) {
1426-
switch (target.httpStyle) {
1427-
case "host": {
1428-
const url = new URL(req.url ?? `http://${req.headers.host}`);
1429-
// If the target is a host, use the path from the request URL
1430-
path = url.pathname + url.search + url.hash;
1431-
headers.host = url.host;
1432-
break;
1433-
}
1434-
case "proxy": {
1435-
// If the target is a proxy, use the full request URL
1436-
path = req.url;
1437-
break;
1438-
}
1439-
}
1440-
}
1441-
1442-
const options: http.RequestOptions = {
1443-
host: target.host,
1444-
port: target.port,
1445-
method: req.method,
1446-
path,
1447-
headers,
1448-
};
1449-
1450-
// Res is optional only on websocket upgrade requests
1451-
assert(res !== undefined, "No response object provided");
1452-
const upstream = http.request(options, (upRes) => {
1453-
// Relay status and headers back to the original client
1454-
res.writeHead(upRes.statusCode ?? 500, upRes.headers);
1455-
// Pipe the response body
1456-
upRes.pipe(res);
1457-
});
1458-
1459-
// Pipe the client request body to the upstream
1460-
req.pipe(upstream);
1461-
1462-
upstream.on("error", (err) => {
1463-
this.#log.error(err);
1464-
if (!res.headersSent) res.writeHead(502);
1465-
res.end("Bad Gateway");
1466-
});
1467-
};
1468-
14691292
async #writeResponse(response: Response, res: http.ServerResponse) {
14701293
// Convert headers into Node-friendly format
14711294
const headers: http.OutgoingHttpHeaders = {};
@@ -1551,17 +1374,9 @@ export class Miniflare {
15511374

15521375
return new Promise((resolve) => {
15531376
const server = stoppable(
1554-
http.createServer(
1555-
{
1556-
// There might be no HOST header when proxying a fetch request made over service binding
1557-
// e.g. env.MY_WORKER.fetch("https://example.com")
1558-
requireHostHeader: false,
1559-
},
1560-
this.#handleLoopback
1561-
),
1377+
http.createServer(this.#handleLoopback),
15621378
/* grace */ 0
15631379
);
1564-
server.on("connect", this.#handleLoopbackConnect);
15651380
server.on("upgrade", this.#handleLoopbackUpgrade);
15661381
server.listen(0, hostname, () => resolve(server));
15671382
});
@@ -1590,8 +1405,8 @@ export class Miniflare {
15901405
}
15911406

15921407
async #assembleConfig(
1593-
loopbackHost: string,
1594-
loopbackPort: number
1408+
loopbackPort: number,
1409+
proxyAddress: string | null
15951410
): Promise<Config> {
15961411
const allPreviousWorkerOpts = this.#previousWorkerOpts;
15971412
const allWorkerOpts = this.#workerOpts;
@@ -1600,9 +1415,10 @@ export class Miniflare {
16001415
sharedOpts.core.cf = await setupCf(this.#log, sharedOpts.core.cf);
16011416
this.#cfObject = sharedOpts.core.cf;
16021417

1603-
const externalServices = this.#devRegistry.isEnabled()
1604-
? getExternalServiceEntrypoints(allWorkerOpts, loopbackHost, loopbackPort)
1605-
: null;
1418+
const externalServices =
1419+
proxyAddress !== null
1420+
? getExternalServiceEntrypoints(allWorkerOpts, proxyAddress)
1421+
: null;
16061422
const durableObjectClassNames = getDurableObjectClassNames(allWorkerOpts);
16071423
const wrappedBindingNames = getWrappedBindingNames(
16081424
allWorkerOpts,
@@ -1891,9 +1707,6 @@ export class Miniflare {
18911707
},
18921708
});
18931709
}
1894-
1895-
// Ask the dev registry to watch for this service
1896-
this.#devRegistry.subscribe(serviceName);
18971710
}
18981711

18991712
const externalObjects = Array.from(externalServices).flatMap(
@@ -1905,12 +1718,15 @@ export class Miniflare {
19051718
);
19061719
const outboundDoProxyService = createOutboundDoProxyService(
19071720
externalObjects,
1908-
`http://${loopbackHost}:${loopbackPort}`,
1721+
`http://${proxyAddress}`,
19091722
this.#devRegistry.isDurableObjectProxyEnabled()
19101723
);
19111724

19121725
assert(outboundDoProxyService.name !== undefined);
19131726
services.set(outboundDoProxyService.name, outboundDoProxyService);
1727+
1728+
// Watch the dev registry for changes
1729+
await this.#devRegistry.watch(externalServices);
19141730
}
19151731

19161732
// Expose all internal durable object with a proxy service
@@ -2014,7 +1830,8 @@ export class Miniflare {
20141830
maybeGetLocallyAccessibleHost(configuredHost) ??
20151831
getURLSafeHost(configuredHost);
20161832
const loopbackPort = await this.#getLoopbackPort();
2017-
const config = await this.#assembleConfig(loopbackHost, loopbackPort);
1833+
const proxyAddress = await this.#devRegistry.initializeProxyWorker();
1834+
const config = await this.#assembleConfig(loopbackPort, proxyAddress);
20181835
const configBuffer = serializeConfig(config);
20191836

20201837
// Get all socket names we expect to get ports for
@@ -2292,10 +2109,12 @@ export class Miniflare {
22922109

22932110
get ready(): Promise<URL> {
22942111
return this.#waitForReady().then(async (url) => {
2112+
assert(this.#socketPorts !== undefined);
2113+
2114+
// Update proxy server with the addresses of the fallback services
2115+
this.#devRegistry.configureProxyWorker(url.toString(), this.#socketPorts);
22952116
// Register all workers with the dev registry
22962117
await this.#registerWorkers(url);
2297-
// Watch for changes to the dev registry
2298-
await this.#devRegistry.watch();
22992118

23002119
return url;
23012120
});

0 commit comments

Comments
 (0)