From e03502336ed39de562054012765d1045c1f4ba32 Mon Sep 17 00:00:00 2001 From: Victoria Hall Date: Thu, 14 Nov 2024 10:02:32 -0600 Subject: [PATCH 1/7] basic working prototype --- src/http/httpProxy.ts | 55 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 50 insertions(+), 5 deletions(-) diff --git a/src/http/httpProxy.ts b/src/http/httpProxy.ts index 70e9bcc..bcc3aa2 100644 --- a/src/http/httpProxy.ts +++ b/src/http/httpProxy.ts @@ -4,6 +4,7 @@ import { serialize as serializeCookie } from 'cookie'; import { EventEmitter } from 'events'; import * as http from 'http'; +import * as net from 'net'; import { AzFuncSystemError, ensureErrorType } from '../errors'; import { nonNullProp } from '../utils/nonNull'; import { workerSystemLog } from '../utils/workerSystemLog'; @@ -16,13 +17,16 @@ const invocRequestEmitter = new EventEmitter(); export async function waitForProxyRequest(invocationId: string): Promise { return new Promise((resolve, _reject) => { + workerSystemLog('debug', 'Waiting for proxy request', { invocationId }); const req = requests[invocationId]; if (req) { resolve(req); + workerSystemLog('debug', 'Proxy request found', { invocationId }); delete requests[invocationId]; } else { invocRequestEmitter.once(invocationId, () => { const req = requests[invocationId]; + workerSystemLog('debug', 'Proxy request else statement'); if (req) { resolve(req); delete requests[invocationId]; @@ -34,11 +38,13 @@ export async function waitForProxyRequest(invocationId: string): Promise { + workerSystemLog('debug', 'Sending proxy response', { invocationId }); const proxyRes = nonNullProp(responses, invocationId); delete responses[invocationId]; for (const [key, val] of userRes.headers.entries()) { proxyRes.setHeader(key, val); } + workerSystemLog('debug', 'Http proxy headers set'); proxyRes.setHeader(invocationIdHeader, invocationId); proxyRes.statusCode = userRes.status; @@ -48,7 +54,9 @@ export async function sendProxyResponse(invocationId: string, userRes: HttpRespo if (userRes.body) { for await (const chunk of userRes.body.values()) { + workerSystemLog('debug', 'Writing proxy response chunk'); proxyRes.write(chunk); + workerSystemLog('debug', 'Http proxy response chunks written'); } } proxyRes.end(); @@ -86,10 +94,12 @@ function setCookies(userRes: HttpResponse, proxyRes: http.ServerResponse): void export async function setupHttpProxy(): Promise { return new Promise((resolve, reject) => { const server = http.createServer(); + workerSystemLog('debug', 'Http proxy server created'); server.on('request', (req, res) => { const invocationId = req.headers[invocationIdHeader]; if (typeof invocationId === 'string') { + workerSystemLog('debug', 'Http proxy request received', { invocationId }); requests[invocationId] = req; responses[invocationId] = res; invocRequestEmitter.emit(invocationId); @@ -103,12 +113,22 @@ export async function setupHttpProxy(): Promise { workerSystemLog('error', `Http proxy error: ${err.stack || err.message}`); }); - server.listen(() => { - const address = server.address(); - if (address !== null && typeof address === 'object') { - resolve(`http://localhost:${address.port}/`); + server.listen(0, () => { + workerSystemLog('debug', `VICTORIA: auto-assigned port: ${server.address().port}`); + + // If port is still 0, find and bind to an open port + if (server.address().port === 0) { + workerSystemLog('debug', `VICTORIA: Port 0 assigned. Finding open port.`); + findOpenPort(51929, (openPort) => { + workerSystemLog('debug', `VICTORIA: found open port: ${openPort}`); + server.close(); // Close the server + server.listen(openPort, () => { + workerSystemLog('debug', `VICTORIA: server is now listening on found open port: ${openPort}`); + }); + resolve(`http://localhost:${openPort}/`); + }); } else { - reject(new AzFuncSystemError('Unexpected server address during http proxy setup')); + resolve(`http://localhost:${server.address().port}/`); } }); @@ -117,3 +137,28 @@ export async function setupHttpProxy(): Promise { }); }); } + + +// Function to find an open port starting from a specified port +function findOpenPort(startingPort, callback) { + const server = net.createServer(); + + function tryPort(port) { + server.once('error', () => { + // If the port is unavailable, increment and try the next one + tryPort(port + 1); + }); + + server.once('listening', () => { + const port = server.address().port; + server.close(); + callback(port); + }); + + // Try binding to the given port + server.listen(port); + } + + // Start trying from the specified starting port + tryPort(startingPort); + } From 9a5d5d84298b7c94790b8d7fd6c1ca18f88f65c9 Mon Sep 17 00:00:00 2001 From: Victoria Hall Date: Thu, 14 Nov 2024 10:16:08 -0600 Subject: [PATCH 2/7] cleaner implementation --- src/http/httpProxy.ts | 48 +++++++++++++++++++++++-------------------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/src/http/httpProxy.ts b/src/http/httpProxy.ts index bcc3aa2..9a8bd67 100644 --- a/src/http/httpProxy.ts +++ b/src/http/httpProxy.ts @@ -113,22 +113,26 @@ export async function setupHttpProxy(): Promise { workerSystemLog('error', `Http proxy error: ${err.stack || err.message}`); }); - server.listen(0, () => { - workerSystemLog('debug', `VICTORIA: auto-assigned port: ${server.address().port}`); - - // If port is still 0, find and bind to an open port - if (server.address().port === 0) { + server.listen(() => { + const address = server.address(); + if (address !== null && address.port === 0) { + // Auto-assigned port is 0, find and bind to an open port workerSystemLog('debug', `VICTORIA: Port 0 assigned. Finding open port.`); findOpenPort(51929, (openPort) => { workerSystemLog('debug', `VICTORIA: found open port: ${openPort}`); - server.close(); // Close the server + // Close the server and re-listen on the found open port + server.close(); server.listen(openPort, () => { workerSystemLog('debug', `VICTORIA: server is now listening on found open port: ${openPort}`); }); resolve(`http://localhost:${openPort}/`); }); + } else if (address !== null && typeof address === 'object') { + // Auto-assigned port is not 0 + workerSystemLog('debug', `VICTORIA: auto-assigned port is valid. Port: ${address.port}`); + resolve(`http://localhost:${address.port}/`); } else { - resolve(`http://localhost:${server.address().port}/`); + reject(new AzFuncSystemError('Unexpected server address during http proxy setup')); } }); @@ -138,27 +142,27 @@ export async function setupHttpProxy(): Promise { }); } - // Function to find an open port starting from a specified port function findOpenPort(startingPort, callback) { const server = net.createServer(); function tryPort(port) { - server.once('error', () => { - // If the port is unavailable, increment and try the next one - tryPort(port + 1); - }); - - server.once('listening', () => { - const port = server.address().port; - server.close(); - callback(port); - }); - - // Try binding to the given port - server.listen(port); + server.once('error', () => { + // If the port is unavailable, increment and try the next one + tryPort(port + 1); + }); + + // If the port is available, return it + server.once('listening', () => { + const port = server.address().port; + server.close(); + callback(port); + }); + + // Try binding to the given port + server.listen(port); } // Start trying from the specified starting port tryPort(startingPort); - } +} From 10a6ce33f45c2908c6b51f2f3aced6cde93e5c8f Mon Sep 17 00:00:00 2001 From: Victoria Hall Date: Thu, 14 Nov 2024 10:50:30 -0600 Subject: [PATCH 3/7] typing fixes + removed logs --- src/http/httpProxy.ts | 45 ++++++++++++++++++++++++------------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/src/http/httpProxy.ts b/src/http/httpProxy.ts index 9a8bd67..dd4d26c 100644 --- a/src/http/httpProxy.ts +++ b/src/http/httpProxy.ts @@ -115,22 +115,24 @@ export async function setupHttpProxy(): Promise { server.listen(() => { const address = server.address(); - if (address !== null && address.port === 0) { - // Auto-assigned port is 0, find and bind to an open port - workerSystemLog('debug', `VICTORIA: Port 0 assigned. Finding open port.`); - findOpenPort(51929, (openPort) => { - workerSystemLog('debug', `VICTORIA: found open port: ${openPort}`); - // Close the server and re-listen on the found open port - server.close(); - server.listen(openPort, () => { - workerSystemLog('debug', `VICTORIA: server is now listening on found open port: ${openPort}`); + // Valid address has been created + if (address !== null && typeof address === 'object') { + if (address.port === 0) { + // Auto-assigned port is 0, find and bind to an open port + workerSystemLog('debug', `Port 0 assigned. Finding open port.`); + findOpenPort(51929, (openPort: number) => { + // Close the server and re-listen on the found open port + server.close(); + server.listen(openPort, () => { + workerSystemLog('debug', `Server is now listening on found open port: ${openPort}`); + }); + resolve(`http://localhost:${openPort}/`); }); - resolve(`http://localhost:${openPort}/`); - }); - } else if (address !== null && typeof address === 'object') { - // Auto-assigned port is not 0 - workerSystemLog('debug', `VICTORIA: auto-assigned port is valid. Port: ${address.port}`); - resolve(`http://localhost:${address.port}/`); + } else { + // Auto-assigned port is not 0 + workerSystemLog('debug', `Auto-assigned port is valid. Port: ${address.port}`); + resolve(`http://localhost:${address.port}/`); + } } else { reject(new AzFuncSystemError('Unexpected server address during http proxy setup')); } @@ -143,10 +145,10 @@ export async function setupHttpProxy(): Promise { } // Function to find an open port starting from a specified port -function findOpenPort(startingPort, callback) { +function findOpenPort(startingPort: number, callback: (port: number) => void): void { const server = net.createServer(); - function tryPort(port) { + function tryPort(port: number) { server.once('error', () => { // If the port is unavailable, increment and try the next one tryPort(port + 1); @@ -154,9 +156,12 @@ function findOpenPort(startingPort, callback) { // If the port is available, return it server.once('listening', () => { - const port = server.address().port; - server.close(); - callback(port); + const address = server.address(); + if (address !== null && typeof address === 'object') { + port = address.port; + server.close(); + callback(port); + } }); // Try binding to the given port From c933e51da89ec60410fafb42c40af42b9cb6d193 Mon Sep 17 00:00:00 2001 From: Victoria Hall Date: Thu, 14 Nov 2024 10:52:27 -0600 Subject: [PATCH 4/7] removed logs --- src/http/httpProxy.ts | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/http/httpProxy.ts b/src/http/httpProxy.ts index dd4d26c..9fe029e 100644 --- a/src/http/httpProxy.ts +++ b/src/http/httpProxy.ts @@ -17,16 +17,13 @@ const invocRequestEmitter = new EventEmitter(); export async function waitForProxyRequest(invocationId: string): Promise { return new Promise((resolve, _reject) => { - workerSystemLog('debug', 'Waiting for proxy request', { invocationId }); const req = requests[invocationId]; if (req) { resolve(req); - workerSystemLog('debug', 'Proxy request found', { invocationId }); delete requests[invocationId]; } else { invocRequestEmitter.once(invocationId, () => { const req = requests[invocationId]; - workerSystemLog('debug', 'Proxy request else statement'); if (req) { resolve(req); delete requests[invocationId]; @@ -38,13 +35,11 @@ export async function waitForProxyRequest(invocationId: string): Promise { - workerSystemLog('debug', 'Sending proxy response', { invocationId }); const proxyRes = nonNullProp(responses, invocationId); delete responses[invocationId]; for (const [key, val] of userRes.headers.entries()) { proxyRes.setHeader(key, val); } - workerSystemLog('debug', 'Http proxy headers set'); proxyRes.setHeader(invocationIdHeader, invocationId); proxyRes.statusCode = userRes.status; @@ -54,9 +49,7 @@ export async function sendProxyResponse(invocationId: string, userRes: HttpRespo if (userRes.body) { for await (const chunk of userRes.body.values()) { - workerSystemLog('debug', 'Writing proxy response chunk'); proxyRes.write(chunk); - workerSystemLog('debug', 'Http proxy response chunks written'); } } proxyRes.end(); @@ -94,12 +87,10 @@ function setCookies(userRes: HttpResponse, proxyRes: http.ServerResponse): void export async function setupHttpProxy(): Promise { return new Promise((resolve, reject) => { const server = http.createServer(); - workerSystemLog('debug', 'Http proxy server created'); server.on('request', (req, res) => { const invocationId = req.headers[invocationIdHeader]; if (typeof invocationId === 'string') { - workerSystemLog('debug', 'Http proxy request received', { invocationId }); requests[invocationId] = req; responses[invocationId] = res; invocRequestEmitter.emit(invocationId); From 90a953eba3f5ffefc82f9eb4d88373793f91d062 Mon Sep 17 00:00:00 2001 From: Victoria Hall Date: Thu, 14 Nov 2024 11:35:52 -0600 Subject: [PATCH 5/7] trying random port --- src/http/httpProxy.ts | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/http/httpProxy.ts b/src/http/httpProxy.ts index 9fe029e..0fc729f 100644 --- a/src/http/httpProxy.ts +++ b/src/http/httpProxy.ts @@ -135,14 +135,20 @@ export async function setupHttpProxy(): Promise { }); } +// Function to get a random port within a specified range +function getRandomPort(minPort = 55000, maxPort = 65535): number { + return Math.floor(Math.random() * (maxPort - minPort + 1)) + minPort; +} + // Function to find an open port starting from a specified port function findOpenPort(startingPort: number, callback: (port: number) => void): void { const server = net.createServer(); function tryPort(port: number) { server.once('error', () => { - // If the port is unavailable, increment and try the next one - tryPort(port + 1); + // If the port is unavailable, get a random port and try again + const randomPort = getRandomPort(); + tryPort(randomPort); }); // If the port is available, return it From 994dd84568a1b83636c4d1cc16f92d54ae5dd3a5 Mon Sep 17 00:00:00 2001 From: Victoria Hall Date: Thu, 14 Nov 2024 11:39:54 -0600 Subject: [PATCH 6/7] port variables --- src/http/httpProxy.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/http/httpProxy.ts b/src/http/httpProxy.ts index 0fc729f..8b009a8 100644 --- a/src/http/httpProxy.ts +++ b/src/http/httpProxy.ts @@ -12,6 +12,8 @@ import { HttpResponse } from './HttpResponse'; const requests: Record = {}; const responses: Record = {}; +const minPort = 55000; +const maxPort = 65535; const invocRequestEmitter = new EventEmitter(); @@ -111,7 +113,7 @@ export async function setupHttpProxy(): Promise { if (address.port === 0) { // Auto-assigned port is 0, find and bind to an open port workerSystemLog('debug', `Port 0 assigned. Finding open port.`); - findOpenPort(51929, (openPort: number) => { + findOpenPort((openPort: number) => { // Close the server and re-listen on the found open port server.close(); server.listen(openPort, () => { @@ -136,12 +138,12 @@ export async function setupHttpProxy(): Promise { } // Function to get a random port within a specified range -function getRandomPort(minPort = 55000, maxPort = 65535): number { +function getRandomPort(): number { return Math.floor(Math.random() * (maxPort - minPort + 1)) + minPort; } // Function to find an open port starting from a specified port -function findOpenPort(startingPort: number, callback: (port: number) => void): void { +function findOpenPort(callback: (port: number) => void): void { const server = net.createServer(); function tryPort(port: number) { @@ -166,5 +168,5 @@ function findOpenPort(startingPort: number, callback: (port: number) => void): v } // Start trying from the specified starting port - tryPort(startingPort); + tryPort(minPort); } From 701161564d355b3c59d05824e1f5470ee32e6a82 Mon Sep 17 00:00:00 2001 From: Victoria Hall Date: Wed, 18 Dec 2024 11:36:19 -0600 Subject: [PATCH 7/7] checking only 25 ports --- src/http/httpProxy.ts | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/http/httpProxy.ts b/src/http/httpProxy.ts index 8b009a8..b1c683b 100644 --- a/src/http/httpProxy.ts +++ b/src/http/httpProxy.ts @@ -13,7 +13,7 @@ import { HttpResponse } from './HttpResponse'; const requests: Record = {}; const responses: Record = {}; const minPort = 55000; -const maxPort = 65535; +const maxPort = 55025; const invocRequestEmitter = new EventEmitter(); @@ -137,20 +137,21 @@ export async function setupHttpProxy(): Promise { }); } -// Function to get a random port within a specified range -function getRandomPort(): number { - return Math.floor(Math.random() * (maxPort - minPort + 1)) + minPort; -} - // Function to find an open port starting from a specified port function findOpenPort(callback: (port: number) => void): void { const server = net.createServer(); function tryPort(port: number) { + if (port > maxPort) { + // If we've reached the maximum port, throw an error + throw new AzFuncSystemError( + `No available ports found between ${minPort} and ${maxPort}. To enable HTTP streaming, please open a port in this range.` + ); + } + server.once('error', () => { - // If the port is unavailable, get a random port and try again - const randomPort = getRandomPort(); - tryPort(randomPort); + // If the port is unavailable, increment and try the next one + tryPort(port + 1); }); // If the port is available, return it