Skip to content

Commit 75914a5

Browse files
authored
TLS->HTTP Proxy: Support outbound request body (#61)
Adds support for sending outbound chunked-encoding POST requests from PHP. Before this PR, they simply failed. ## Implementation * Without this PR, `fetchWithCorsProxy()` exhausts the body stream on the first, non-proxied request attempt, which means it the second, proxied attempt can't access the body bytes. This PR tee the request before the first attempt to make sure we still have an unread stream for the second attempt. * Without this PR, `TCPOverFetchWebsocket` ignores chunked-encoded request body. This PR detects chunked encoding and processes the body stream. * Without this PR, `TCPOverFetchWebsocket` accumulates chunked encoding lengths trailers and passes them to `fetch()`, corrupting the request body. This PR decodes the chunked-encoded stream and passes plain body bytes over to `fetch()`. * Without this PR, `createMemoizedFetch()` locked the response stream when called twice without awaiting, which caused seemingly random failures. This PR ensures `response.body.tee()` is called before the caller has a chance to call `createMemoizedFetch()` again after an event loop tick. ## Testing instructions Additional unit test is included in this PR.
1 parent 1acd783 commit 75914a5

File tree

8 files changed

+259
-47
lines changed

8 files changed

+259
-47
lines changed

packages/php-wasm/web-service-worker/src/utils.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,26 @@ export async function cloneRequest(
175175
});
176176
}
177177

178+
/**
179+
* Tee a request to ensure the body stream is not consumed
180+
* when executing or cloning the request.
181+
*
182+
* @param request
183+
* @returns
184+
*/
185+
export async function teeRequest(
186+
request: Request
187+
): Promise<[Request, Request]> {
188+
if (!request.body) {
189+
return [request, request];
190+
}
191+
const [body1, body2] = request.body.tee();
192+
return [
193+
await cloneRequest(request, { body: body1, duplex: 'half' }),
194+
await cloneRequest(request, { body: body2, duplex: 'half' }),
195+
];
196+
}
197+
178198
/**
179199
* Extracts headers from a Request as a plain key->value JS object.
180200
*
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import { concatUint8Arrays } from './tls/utils';
2+
3+
/**
4+
* A TransformStream that decodes HTTP chunked transfer encoding.
5+
* Each chunk starts with the chunk size in hex followed by CRLF,
6+
* then the chunk data, then CRLF. A chunk size of 0 indicates the end.
7+
*/
8+
export class ChunkedDecoderStream extends TransformStream<
9+
Uint8Array,
10+
Uint8Array
11+
> {
12+
constructor() {
13+
let buffer = new Uint8Array(0);
14+
let state:
15+
| 'SCAN_CHUNK_SIZE'
16+
| 'SCAN_CHUNK_DATA'
17+
| 'SCAN_CHUNK_TRAILER'
18+
| 'SCAN_FINAL_CHUNK' = 'SCAN_CHUNK_SIZE';
19+
let chunkRemainingBytes = 0;
20+
21+
super({
22+
transform(chunk, controller) {
23+
// Add new chunk to buffer
24+
buffer = concatUint8Arrays([buffer, chunk]);
25+
26+
while (buffer.length > 0) {
27+
if (state === 'SCAN_CHUNK_SIZE') {
28+
// Need at least "0\r\n" (3 bytes)
29+
if (buffer.length < 3) {
30+
return;
31+
}
32+
33+
// Find the chunk size hex digits
34+
let chunkBytesNb = 0;
35+
while (chunkBytesNb < buffer.length) {
36+
const byte = buffer[chunkBytesNb];
37+
const isHexDigit =
38+
(byte >= 48 && byte <= 57) || // 0-9
39+
(byte >= 97 && byte <= 102) || // a-f
40+
(byte >= 65 && byte <= 70); // A-F
41+
if (!isHexDigit) break;
42+
chunkBytesNb++;
43+
}
44+
45+
if (chunkBytesNb === 0) {
46+
throw new Error('Invalid chunk size format');
47+
}
48+
49+
// Look for CRLF after chunk size
50+
if (
51+
buffer.length < chunkBytesNb + 2 ||
52+
buffer[chunkBytesNb] !== 13 || // \r
53+
buffer[chunkBytesNb + 1] !== 10 // \n
54+
) {
55+
return;
56+
}
57+
58+
// Parse the chunk size
59+
const chunkSizeHex = new TextDecoder().decode(
60+
buffer.slice(0, chunkBytesNb)
61+
);
62+
const chunkSize = parseInt(chunkSizeHex, 16);
63+
64+
// Remove chunk header from buffer
65+
buffer = buffer.slice(chunkBytesNb + 2);
66+
67+
if (chunkSize === 0) {
68+
state = 'SCAN_FINAL_CHUNK';
69+
controller.terminate();
70+
return;
71+
}
72+
73+
chunkRemainingBytes = chunkSize;
74+
state = 'SCAN_CHUNK_DATA';
75+
} else if (state === 'SCAN_CHUNK_DATA') {
76+
const bytesToRead = Math.min(
77+
chunkRemainingBytes,
78+
buffer.length
79+
);
80+
const data = buffer.slice(0, bytesToRead);
81+
buffer = buffer.slice(bytesToRead);
82+
chunkRemainingBytes -= bytesToRead;
83+
84+
controller.enqueue(data);
85+
86+
if (chunkRemainingBytes === 0) {
87+
state = 'SCAN_CHUNK_TRAILER';
88+
}
89+
} else if (state === 'SCAN_CHUNK_TRAILER') {
90+
if (buffer.length < 2) {
91+
return;
92+
}
93+
94+
if (buffer[0] !== 13 || buffer[1] !== 10) {
95+
// \r\n
96+
throw new Error('Expected CRLF after chunk data');
97+
}
98+
99+
buffer = buffer.slice(2);
100+
state = 'SCAN_CHUNK_SIZE';
101+
}
102+
}
103+
},
104+
});
105+
}
106+
}
Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,27 @@
1-
import { cloneRequest } from '@php-wasm/web-service-worker';
1+
import { cloneRequest, teeRequest } from '@php-wasm/web-service-worker';
22

33
export async function fetchWithCorsProxy(
44
input: RequestInfo,
55
init?: RequestInit,
66
corsProxyUrl?: string
77
): Promise<Response> {
8-
const directFetch = fetch(input, init);
8+
const requestObject =
9+
typeof input === 'string' ? new Request(input, init) : input;
910
if (!corsProxyUrl) {
10-
return directFetch;
11+
return await fetch(requestObject);
1112
}
1213

14+
// Tee the request to avoid consuming the request body stream on the initial
15+
// fetch() so that we can retry through the cors proxy.
16+
const [request1, request2] = await teeRequest(requestObject);
17+
1318
try {
14-
return await directFetch;
19+
return await fetch(request1);
1520
} catch {
16-
let newInput: string | Request;
17-
if (typeof input === 'string' || input instanceof URL) {
18-
newInput = `${corsProxyUrl}${input}`;
19-
} else if (input instanceof Request) {
20-
newInput = await cloneRequest(input, {
21-
url: `${corsProxyUrl}${input.url}`,
22-
});
23-
} else {
24-
throw new Error('Invalid input type for fetch');
25-
}
21+
const newRequest = await cloneRequest(request2, {
22+
url: `${corsProxyUrl}${requestObject.url}`,
23+
});
2624

27-
return fetch(newInput, init);
25+
return await fetch(newRequest, init);
2826
}
2927
}

packages/php-wasm/web/src/lib/tcp-over-fetch-websocket.spec.ts

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
import { TCPOverFetchWebsocket } from './tcp-over-fetch-websocket';
1+
import {
2+
TCPOverFetchWebsocket,
3+
RawBytesFetch,
4+
} from './tcp-over-fetch-websocket';
25
import express from 'express';
36
import http from 'http';
47
import { AddressInfo } from 'net';
@@ -344,6 +347,29 @@ describe('TCPOverFetchWebsocket', () => {
344347
});
345348
});
346349

350+
describe('RawBytesFetch', () => {
351+
it('parseHttpRequest should handle an transfer-encoding: chunked POST requests', async () => {
352+
const encodedBodyBytes = 'abcde';
353+
const encodedChunkedBodyBytes = `${encodedBodyBytes.length}\r\n${encodedBodyBytes}\r\n0\r\n\r\n`;
354+
const requestBytes = `POST /echo HTTP/1.1\r\nHost: playground.internal\r\ntransfer-encoding: chunked\r\n\r\n${encodedChunkedBodyBytes}`;
355+
const request = await RawBytesFetch.parseHttpRequest(
356+
new ReadableStream({
357+
start(controller) {
358+
controller.enqueue(new TextEncoder().encode(requestBytes));
359+
controller.close();
360+
},
361+
}),
362+
'playground.internal',
363+
'http'
364+
);
365+
const parsedRequestBody = await request.body?.getReader().read();
366+
const decodedRequestBody = new TextDecoder().decode(
367+
parsedRequestBody?.value
368+
);
369+
expect(decodedRequestBody).toEqual(encodedBodyBytes);
370+
});
371+
});
372+
347373
type MakeRequestOptions = {
348374
host: string;
349375
port: number;

packages/php-wasm/web/src/lib/tcp-over-fetch-websocket.ts

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import { generateCertificate, GeneratedCertificate } from './tls/certificates';
4343
import { concatUint8Arrays } from './tls/utils';
4444
import { ContentTypes } from './tls/1_2/types';
4545
import { fetchWithCorsProxy } from './fetch-with-cors-proxy';
46+
import { ChunkedDecoderStream } from './chunked-decoder';
4647

4748
export type TCPOverFetchOptions = {
4849
CAroot: GeneratedCertificate;
@@ -417,7 +418,7 @@ function guessProtocol(port: number, data: Uint8Array) {
417418
return 'other';
418419
}
419420

420-
class RawBytesFetch {
421+
export class RawBytesFetch {
421422
/**
422423
* Streams a HTTP response including the status line and headers.
423424
*/
@@ -578,13 +579,24 @@ class RawBytesFetch {
578579

579580
const headersBuffer = inputBuffer.slice(0, headersEndIndex);
580581
const parsedHeaders = RawBytesFetch.parseRequestHeaders(headersBuffer);
582+
const terminationMode =
583+
parsedHeaders.headers.get('Transfer-Encoding') !== null
584+
? 'chunked'
585+
: 'content-length';
586+
const contentLength =
587+
parsedHeaders.headers.get('Content-Length') !== null
588+
? parseInt(parsedHeaders.headers.get('Content-Length')!, 10)
589+
: undefined;
581590

582591
const bodyBytes = inputBuffer.slice(
583592
headersEndIndex + 4 /* Skip \r\n\r\n */
584593
);
585594
let outboundBodyStream: ReadableStream<Uint8Array> | undefined;
586595
if (parsedHeaders.method !== 'GET') {
587596
const requestBytesReader = requestBytesStream.getReader();
597+
let seenBytes = bodyBytes.length;
598+
let last5Bytes = bodyBytes.slice(-6);
599+
const emptyChunk = new TextEncoder().encode('0\r\n\r\n');
588600
outboundBodyStream = new ReadableStream<Uint8Array>({
589601
async start(controller) {
590602
if (bodyBytes.length > 0) {
@@ -596,16 +608,47 @@ class RawBytesFetch {
596608
},
597609
async pull(controller) {
598610
const { done, value } = await requestBytesReader.read();
599-
611+
seenBytes += value?.length || 0;
600612
if (value) {
601613
controller.enqueue(value);
614+
last5Bytes = concatUint8Arrays([
615+
last5Bytes,
616+
value || new Uint8Array(),
617+
]).slice(-5);
602618
}
603-
if (done) {
619+
const shouldTerminate =
620+
done ||
621+
(terminationMode === 'content-length' &&
622+
contentLength !== undefined &&
623+
seenBytes >= contentLength) ||
624+
(terminationMode === 'chunked' &&
625+
last5Bytes.every(
626+
(byte, index) => byte === emptyChunk[index]
627+
));
628+
if (shouldTerminate) {
604629
controller.close();
605630
return;
606631
}
607632
},
608633
});
634+
635+
if (terminationMode === 'chunked') {
636+
// Strip chunked transfer encoding from the request body stream.
637+
// PHP may encode the request body with chunked transfer encoding,
638+
// giving us a stream of chunks with a size line ending in \r\n,
639+
// a body chunk, and a chunk trailer ending in \r\n.
640+
//
641+
// We must not include the chunk headers and trailers in the
642+
// transmitted data. fetch() trusts us to provide the body stream
643+
// in its original form and will pass treat the chunked encoding
644+
// artifacts as a part of the data to be transmitted to the server.
645+
// This, in turn, means sending over a corrupted request body.
646+
//
647+
// Therefore, let's just strip any chunked encoding-related bytes.
648+
outboundBodyStream = outboundBodyStream.pipeThrough(
649+
new ChunkedDecoderStream()
650+
);
651+
}
609652
}
610653

611654
/**
Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
export interface CachedFetchResponse {
2-
body: ReadableStream<Uint8Array>;
3-
responseInit: ResponseInit;
1+
export interface CacheEntry {
2+
responsePromise: Promise<Response>;
3+
unlockedBodyStream?: ReadableStream<Uint8Array>;
4+
nextResponse: () => Promise<Response>;
45
}
56

67
/**
@@ -12,32 +13,29 @@ export interface CachedFetchResponse {
1213
* @param originalFetch The fetch function to memoize. Defaults to the global fetch.
1314
*/
1415
export function createMemoizedFetch(originalFetch = fetch) {
15-
const cache: Record<
16-
string,
17-
Promise<CachedFetchResponse> | CachedFetchResponse
18-
> = {};
16+
const fetches: Record<string, CacheEntry> = {};
1917

2018
return async function memoizedFetch(url: string, options?: RequestInit) {
21-
if (!cache[url]) {
22-
// Write to cache synchronously to avoid duplicate requests.
23-
cache[url] = originalFetch(url, options).then((response) => ({
24-
body: response.body!,
25-
responseInit: {
26-
status: response.status,
27-
statusText: response.statusText,
28-
headers: response.headers,
19+
if (!fetches[url]) {
20+
fetches[url] = {
21+
responsePromise: originalFetch(url, options),
22+
async nextResponse() {
23+
// Wait for "result" to be set.
24+
const response = await fetches[url].responsePromise;
25+
const [left, right] =
26+
fetches[url].unlockedBodyStream!.tee();
27+
fetches[url].unlockedBodyStream = left;
28+
return new Response(right, {
29+
status: response.status,
30+
statusText: response.statusText,
31+
headers: response.headers,
32+
});
2933
},
30-
}));
34+
};
35+
const response = await fetches[url].responsePromise;
36+
fetches[url].unlockedBodyStream = response.body!;
3137
}
32-
const { body, responseInit } = await cache[url];
33-
// Split the response stream so that the cached one is not consumed.
34-
const [left, right] = body.tee();
35-
// Cache the "left" stream and don't use it until the next .tee().
36-
cache[url] = {
37-
body: left,
38-
responseInit,
39-
};
40-
// Return the "right" stream for consumption.
41-
return new Response(right, responseInit);
38+
39+
return fetches[url].nextResponse();
4240
};
4341
}

packages/playground/common/src/test/create-memoized-fetch.spec.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,25 @@ describe('createMemoizedFetch', () => {
2323
expect(await response1.text()).toBe(await response2.text());
2424
expect(fetch).toHaveBeenCalledTimes(1);
2525
});
26+
27+
it('should correctly handle teeing the response stream when called twice without awaiting', async () => {
28+
const fetch = vitest.fn().mockResolvedValueOnce(
29+
new Response('hello', {
30+
status: 200,
31+
statusText: 'OK',
32+
headers: { 'Content-type': 'text/plain' },
33+
})
34+
);
35+
const memoizedFetch = createMemoizedFetch(fetch);
36+
const response1Promise = memoizedFetch('https://example.com');
37+
const response2Promise = memoizedFetch('https://example.com');
38+
const response1 = await response1Promise;
39+
const response2 = await response2Promise;
40+
expect(response1.status).toBe(response2.status);
41+
expect(response1.headers.get('Content-type')).toBe(
42+
response2.headers.get('Content-type')
43+
);
44+
expect(await response1.text()).toBe(await response2.text());
45+
expect(fetch).toHaveBeenCalledTimes(1);
46+
});
2647
});

0 commit comments

Comments
 (0)