Skip to content

Commit 4d62b77

Browse files
committed
TransformStream cleanup using "Transformer.cancel"
This commit adds a "cancel" hook to "Transformer". This allows users to perform resource cleanup when the readable side of the TransformStream is cancelled, or the writable side is aborted. To preserve existing behavior, when the readable side is cancelled with a reason, the writable side is always immediately aborted with that same reason. The same is true in the reverse case. This means that the status of both sides is always either "closed", "erroring", or "erroring" when the "cancel" hook is called. "flush" and "cancel" are never both called. As per existing behaviour, when the writable side is closed the "flush" hook is called. If the readable side is cancelled while a promise returned from "flush" is still pending, "cancel" is not called. In this scenario the readable side ends up in the "errored" state, while the writable side ends up in the "closed" state.
1 parent 8d7a0bf commit 4d62b77

File tree

3 files changed

+133
-21
lines changed

3 files changed

+133
-21
lines changed

index.bs

Lines changed: 88 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5489,13 +5489,15 @@ dictionary Transformer {
54895489
TransformerStartCallback start;
54905490
TransformerTransformCallback transform;
54915491
TransformerFlushCallback flush;
5492+
TransformerCancelCallback cancel;
54925493
any readableType;
54935494
any writableType;
54945495
};
54955496

54965497
callback TransformerStartCallback = any (TransformStreamDefaultController controller);
54975498
callback TransformerFlushCallback = Promise<undefined> (TransformStreamDefaultController controller);
54985499
callback TransformerTransformCallback = Promise<undefined> (any chunk, TransformStreamDefaultController controller);
5500+
callback TransformerCancelCallback = Promise<undefined> (any reason, TransformStreamDefaultController controller);
54995501
</xmp>
55005502

55015503
<dl>
@@ -5558,6 +5560,25 @@ callback TransformerTransformCallback = Promise<undefined> (any chunk, Transform
55585560
{{Transformer/flush|flush()}}; the stream is already in the process of successfully closing down,
55595561
and terminating it would be counterproductive.)
55605562

5563+
<dt><dfn dict-member for="Transformer" lt="cancel">cancel(<var ignore>reason</var>, <var ignore>controller</var>)</dfn></dt>
5564+
<dd>
5565+
<p>A function called when the [=writable side=] is aborted, or when the [=readable side=] is
5566+
cancelled.
5567+
5568+
<p>Typically this is used to clean up underlying transformer resources when the stream is
5569+
aborted or cancelled.
5570+
5571+
<p>If the cancellation process is asynchronous, the function can return a promise to signal
5572+
success or failure; the result will be communicated to the caller of
5573+
{{WritableStream/abort()|stream.writable.abort()}} or
5574+
{{ReadableStream/cancel()|stream.readable.cancel()}}. Throwing an exception is treated the same
5575+
as returning a rejected promise.
5576+
5577+
<p>(Note that there is no need to call
5578+
{{TransformStreamDefaultController/terminate()|controller.terminate()}} inside
5579+
{{Transformer/cancel|cancel()}}; the stream is already in the process of cancelling/aborting, and
5580+
terminating it would be counterproductive.)
5581+
55615582
<dt><dfn dict-member for="Transformer">readableType</dfn></dt>
55625583
<dd>
55635584
<p>This property is reserved for future use, so any attempts to supply a value will throw an
@@ -5570,9 +5591,9 @@ callback TransformerTransformCallback = Promise<undefined> (any chunk, Transform
55705591
</dl>
55715592

55725593
The <code>controller</code> object passed to {{Transformer/start|start()}},
5573-
{{Transformer/transform|transform()}}, and {{Transformer/flush|flush()}} is an instance of
5574-
{{TransformStreamDefaultController}}, and has the ability to enqueue [=chunks=] to the [=readable
5575-
side=], or to terminate or error the stream.
5594+
{{Transformer/transform|transform()}}, {{Transformer/flush|flush()}}, and
5595+
{{Transformer/cancel|cancel()}} is an instance of {{TransformStreamDefaultController}}, and has the
5596+
ability to enqueue [=chunks=] to the [=readable side=], or to terminate or error the stream.
55765597

55775598
<h4 id="ts-prototype">Constructor and properties</h4>
55785599

@@ -5726,6 +5747,10 @@ the following table:
57265747
<th>Internal Slot</th>
57275748
<th>Description (<em>non-normative</em>)</th>
57285749
<tbody>
5750+
<tr>
5751+
<td><dfn>\[[cancelAlgorithm]]</dfn>
5752+
<td class="non-normative">A promise-returning algorithm, taking one argument (the [=reason=] for
5753+
cancellation), which communicates a requested cancellation to the [=transformer=]
57295754
<tr>
57305755
<td><dfn>\[[flushAlgorithm]]</dfn>
57315756
<td class="non-normative">A promise-returning algorithm which communicates a requested close to
@@ -5819,8 +5844,7 @@ The following abstract operations operate on {{TransformStream}} instances at a
58195844
1. Let |pullAlgorithm| be the following steps:
58205845
1. Return ! [$TransformStreamDefaultSourcePullAlgorithm$](|stream|).
58215846
1. Let |cancelAlgorithm| be the following steps, taking a |reason| argument:
5822-
1. Perform ! [$TransformStreamErrorWritableAndUnblockWrite$](|stream|, |reason|).
5823-
1. Return [=a promise resolved with=] undefined.
5847+
1. Return ! [$TransformStreamDefaultSourceCancelAlgorithm$](|stream|, |reason|).
58245848
1. Set |stream|.[=TransformStream/[[readable]]=] to ! [$CreateReadableStream$](|startAlgorithm|,
58255849
|pullAlgorithm|, |cancelAlgorithm|, |readableHighWaterMark|, |readableSizeAlgorithm|).
58265850
1. Set |stream|.[=TransformStream/[[backpressure]]=] and
@@ -5854,6 +5878,14 @@ The following abstract operations operate on {{TransformStream}} instances at a
58545878
1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|stream|.[=TransformStream/[[controller]]=]).
58555879
1. Perform !
58565880
[$WritableStreamDefaultControllerErrorIfNeeded$](|stream|.[=TransformStream/[[writable]]=].[=WritableStream/[[controller]]=], |e|).
5881+
1. Perform ! [$TransformStreamUnblockWrite$](|stream|).
5882+
</div>
5883+
5884+
<div algorithm>
5885+
<dfn abstract-op lt="TransformStreamUnblockWrite"
5886+
id="transform-stream-unblock-write">TransformStreamUnblockWrite(|stream|)</dfn> performs the
5887+
following steps:
5888+
58575889
1. If |stream|.[=TransformStream/[[backpressure]]=] is true, perform ! [$TransformStreamSetBackpressure$](|stream|,
58585890
false).
58595891

@@ -5882,7 +5914,8 @@ The following abstract operations support the implementaiton of the
58825914
<div algorithm>
58835915
<dfn abstract-op lt="SetUpTransformStreamDefaultController"
58845916
id="set-up-transform-stream-default-controller">SetUpTransformStreamDefaultController(|stream|,
5885-
|controller|, |transformAlgorithm|, |flushAlgorithm|)</dfn> performs the following steps:
5917+
|controller|, |transformAlgorithm|, |flushAlgorithm|, |cancelAlgorithm|)</dfn> performs the
5918+
following steps:
58865919

58875920
1. Assert: |stream| [=implements=] {{TransformStream}}.
58885921
1. Assert: |stream|.[=TransformStream/[[controller]]=] is undefined.
@@ -5891,6 +5924,7 @@ The following abstract operations support the implementaiton of the
58915924
1. Set |controller|.[=TransformStreamDefaultController/[[transformAlgorithm]]=] to
58925925
|transformAlgorithm|.
58935926
1. Set |controller|.[=TransformStreamDefaultController/[[flushAlgorithm]]=] to |flushAlgorithm|.
5927+
1. Set |controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=] to |cancelAlgorithm|.
58945928
</div>
58955929

58965930
<div algorithm>
@@ -5904,15 +5938,20 @@ The following abstract operations support the implementaiton of the
59045938
1. If |result| is an abrupt completion, return [=a promise rejected with=] |result|.\[[Value]].
59055939
1. Otherwise, return [=a promise resolved with=] undefined.
59065940
1. Let |flushAlgorithm| be an algorithm which returns [=a promise resolved with=] undefined.
5941+
1. Let |cancelAlgorithm| be an algorithm which returns [=a promise resolved with=] undefined.
59075942
1. If |transformerDict|["{{Transformer/transform}}"] [=map/exists=], set |transformAlgorithm| to an
59085943
algorithm which takes an argument |chunk| and returns the result of [=invoking=]
59095944
|transformerDict|["{{Transformer/transform}}"] with argument list «&nbsp;|chunk|,
59105945
|controller|&nbsp;» and [=callback this value=] |transformer|.
59115946
1. If |transformerDict|["{{Transformer/flush}}"] [=map/exists=], set |flushAlgorithm| to an
59125947
algorithm which returns the result of [=invoking=] |transformerDict|["{{Transformer/flush}}"]
59135948
with argument list «&nbsp;|controller|&nbsp;» and [=callback this value=] |transformer|.
5949+
1. If |transformerDict|["{{Transformer/cancel}}"] [=map/exists=], set |cancelAlgorithm| to an
5950+
algorithm which takes an argument |reason| and returns the result of [=invoking=]
5951+
|transformerDict|["{{Transformer/cancel}}"] with argument list «&nbsp;|reason|,
5952+
|controller|&nbsp;» and [=callback this value=] |transformer|.
59145953
1. Perform ! [$SetUpTransformStreamDefaultController$](|stream|, |controller|,
5915-
|transformAlgorithm|, |flushAlgorithm|).
5954+
|transformAlgorithm|, |flushAlgorithm|, |cancelAlgorithm|).
59165955
</div>
59175956

59185957
<div algorithm>
@@ -5931,6 +5970,7 @@ The following abstract operations support the implementaiton of the
59315970

59325971
1. Set |controller|.[=TransformStreamDefaultController/[[transformAlgorithm]]=] to undefined.
59335972
1. Set |controller|.[=TransformStreamDefaultController/[[flushAlgorithm]]=] to undefined.
5973+
1. Set |controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=] to undefined.
59345974
</div>
59355975

59365976
<div algorithm>
@@ -6021,8 +6061,13 @@ side=] of [=transform streams=].
60216061
id="transform-stream-default-sink-abort-algorithm">TransformStreamDefaultSinkAbortAlgorithm(|stream|,
60226062
|reason|)</dfn> performs the following steps:
60236063

6024-
1. Perform ! [$TransformStreamError$](|stream|, |reason|).
6025-
1. Return [=a promise resolved with=] undefined.
6064+
1. Let |readable| be |stream|.[=TransformStream/[[readable]]=].
6065+
1. [$ReadableStreamDefaultControllerError$](|readable|.[=ReadableStream/[[controller]]=], |reason|).
6066+
1. Let |controller| be |stream|.[=TransformStream/[[controller]]=].
6067+
1. Let |cancelPromise| be the result of performing
6068+
|controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=], passing |reason|.
6069+
1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|controller|).
6070+
1. Return |cancelPromise|.
60266071
</div>
60276072

60286073
<div algorithm>
@@ -6062,6 +6107,30 @@ side=] of [=transform streams=].
60626107
1. Return |stream|.[=TransformStream/[[backpressureChangePromise]]=].
60636108
</div>
60646109

6110+
6111+
<div algorithm>
6112+
<dfn abstract-op lt="TransformStreamDefaultSourceCancelAlgorithm"
6113+
id="transform-stream-default-source-cancel">TransformStreamDefaultSourceCancelAlgorithm(|reason|,
6114+
|stream|)</dfn> performs the following steps:
6115+
6116+
1. Let |writable| be |stream|.[=TransformStream/[[writable]]=].
6117+
1. If |writable|.[=WritableStream/[[state]]=] is not "`writable`", return
6118+
[=a promise resolved with=] undefined.
6119+
1. Perform !
6120+
[$WritableStreamDefaultControllerErrorIfNeeded$](|writable|.[=WritableStream/[[controller]]=], |reason|).
6121+
1. Perform ! [$TransformStreamUnblockWrite$](|stream|).
6122+
1. Let |controller| be |stream|.[=TransformStream/[[controller]]=].
6123+
1. Let |cancelPromise| be the result of performing
6124+
|controller|.[=TransformStreamDefaultController/[[cancelAlgorithm]]=], passing |reason|.
6125+
1. Perform ! [$TransformStreamDefaultControllerClearAlgorithms$](|controller|).
6126+
1. Return |cancelPromise|.
6127+
6128+
<p class="note">The early return prevents the cancellation algorithm from being called if the
6129+
writable side is already (in the process of being) closed. This is important, because the
6130+
cancellation algorithm <span class=allow-2119>must</span> not run if the flush algorithm has
6131+
already been run.
6132+
</div>
6133+
60656134
<h2 id="qs">Queuing strategies</h2>
60666135

60676136
<h3 id="qs-api">The queuing strategy API</h3>
@@ -7106,9 +7175,11 @@ reason.
71067175
<div algorithm="create a TransformStream">
71077176
To <dfn export for="TransformStream" lt="set up|setting up">set up</dfn> a
71087177
newly-[=new|created-via-Web IDL=] {{TransformStream}} |stream| given an algorithm <dfn export
7109-
for="TransformStream/set up"><var>transformAlgorithm</var></dfn> and an optional algorithm <dfn
7110-
export for="TransformStream/set up"><var>flushAlgorithm</var></dfn>, perform the following steps.
7111-
|transformAlgorithm| and, if given, |flushAlgorithm|, may return a promise.
7178+
for="TransformStream/set up"><var>transformAlgorithm</var></dfn>, an optional algorithm <dfn
7179+
export for="TransformStream/set up"><var>flushAlgorithm</var></dfn>, and an optional algorithm <dfn
7180+
export for="TransformStream/set up"><var>cancelAlgorithm</var></dfn>, perform the following steps.
7181+
|transformAlgorithm|, if given, |flushAlgorithm|, and, if given, |cancelAlgorithm|, may return a
7182+
promise.
71127183

71137184
1. Let |writableHighWaterMark| be 1.
71147185
1. Let |writableSizeAlgorithm| be an algorithm that returns 1.
@@ -7124,12 +7195,16 @@ reason.
71247195
null otherwise. If this throws an exception |e|, return [=a promise rejected with=] |e|.
71257196
1. If |result| is a {{Promise}}, then return |result|.
71267197
1. Return [=a promise resolved with=] undefined.
7198+
1. Let |cancelAlgorithmWrapper| be an algorithm that runs these steps given a value |reason|:
7199+
1. Let |result| be the result of running |cancelAlgorithm| given |reason|, if |cancelAlgorithm|
7200+
was given, or null otherwise. If this throws an exception |e|, return
7201+
[=a promise rejected with=] |e|.
71277202
1. Let |startPromise| be [=a promise resolved with=] undefined.
71287203
1. Perform ! [$InitializeTransformStream$](|stream|, |startPromise|, |writableHighWaterMark|,
71297204
|writableSizeAlgorithm|, |readableHighWaterMark|, |readableSizeAlgorithm|).
71307205
1. Let |controller| be a [=new=] {{TransformStreamDefaultController}}.
71317206
1. Perform ! [$SetUpTransformStreamDefaultController$](|stream|, |controller|,
7132-
|transformAlgorithmWrapper|, |flushAlgorithmWrapper|).
7207+
|transformAlgorithmWrapper|, |flushAlgorithmWrapper|, |cancelAlgorithmWrapper|).
71337208

71347209
Other specifications should be careful when constructing their
71357210
<i>[=TransformStream/set up/transformAlgorithm=]</i> to avoid [=in parallel=] reads from the given

reference-implementation/lib/Transformer.webidl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ dictionary Transformer {
22
TransformerStartCallback start;
33
TransformerTransformCallback transform;
44
TransformerFlushCallback flush;
5+
TransformerCancelCallback cancel;
56
any readableType;
67
any writableType;
78
};
89

910
callback TransformerStartCallback = any (TransformStreamDefaultController controller);
1011
callback TransformerFlushCallback = Promise<undefined> (TransformStreamDefaultController controller);
1112
callback TransformerTransformCallback = Promise<undefined> (any chunk, TransformStreamDefaultController controller);
13+
callback TransformerCancelCallback = Promise<undefined> (any reason, TransformStreamDefaultController controller);

reference-implementation/lib/abstract-ops/transform-streams.js

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,7 @@ function InitializeTransformStream(
5151
}
5252

5353
function cancelAlgorithm(reason) {
54-
TransformStreamErrorWritableAndUnblockWrite(stream, reason);
55-
return promiseResolvedWith(undefined);
54+
return TransformStreamDefaultSourceCancelAlgorithm(stream, reason);
5655
}
5756

5857
stream._readable = CreateReadableStream(
@@ -77,6 +76,10 @@ function TransformStreamError(stream, e) {
7776
function TransformStreamErrorWritableAndUnblockWrite(stream, e) {
7877
TransformStreamDefaultControllerClearAlgorithms(stream._controller);
7978
WritableStreamDefaultControllerErrorIfNeeded(stream._writable._controller, e);
79+
TransformStreamUnblockWrite(stream);
80+
}
81+
82+
function TransformStreamUnblockWrite(stream) {
8083
if (stream._backpressure === true) {
8184
// Pretend that pull() was called to permit any pending write() calls to complete. TransformStreamSetBackpressure()
8285
// cannot be called from enqueue() or pull() once the ReadableStream is errored, so this will will be the final time
@@ -102,7 +105,8 @@ function TransformStreamSetBackpressure(stream, backpressure) {
102105

103106
// Default controllers
104107

105-
function SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm) {
108+
function SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm,
109+
cancelAlgorithm) {
106110
assert(TransformStream.isImpl(stream));
107111
assert(stream._controller === undefined);
108112

@@ -111,6 +115,7 @@ function SetUpTransformStreamDefaultController(stream, controller, transformAlgo
111115

112116
controller._transformAlgorithm = transformAlgorithm;
113117
controller._flushAlgorithm = flushAlgorithm;
118+
controller._cancelAlgorithm = cancelAlgorithm;
114119
}
115120

116121
function SetUpTransformStreamDefaultControllerFromTransformer(stream, transformer, transformerDict) {
@@ -126,20 +131,25 @@ function SetUpTransformStreamDefaultControllerFromTransformer(stream, transforme
126131
};
127132

128133
let flushAlgorithm = () => promiseResolvedWith(undefined);
134+
let cancelAlgorithm = () => promiseResolvedWith(undefined);
129135

130136
if ('transform' in transformerDict) {
131137
transformAlgorithm = chunk => transformerDict.transform.call(transformer, chunk, controller);
132138
}
133139
if ('flush' in transformerDict) {
134140
flushAlgorithm = () => transformerDict.flush.call(transformer, controller);
135141
}
142+
if ('cancel' in transformerDict) {
143+
cancelAlgorithm = reason => transformerDict.cancel.call(transformer, reason, controller);
144+
}
136145

137-
SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm);
146+
SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm, cancelAlgorithm);
138147
}
139148

140149
function TransformStreamDefaultControllerClearAlgorithms(controller) {
141150
controller._transformAlgorithm = undefined;
142151
controller._flushAlgorithm = undefined;
152+
controller._cancelAlgorithm = undefined;
143153
}
144154

145155
function TransformStreamDefaultControllerEnqueue(controller, chunk) {
@@ -221,10 +231,17 @@ function TransformStreamDefaultSinkWriteAlgorithm(stream, chunk) {
221231
}
222232

223233
function TransformStreamDefaultSinkAbortAlgorithm(stream, reason) {
224-
// abort() is not called synchronously, so it is possible for abort() to be called when the stream is already
225-
// errored.
226-
TransformStreamError(stream, reason);
227-
return promiseResolvedWith(undefined);
234+
verbose('TransformStreamDefaultSinkAbortAlgorithm()');
235+
236+
// stream._readable cannot change after construction, so caching it across a call to user code is safe.
237+
const readable = stream._readable;
238+
ReadableStreamDefaultControllerError(readable._controller, reason);
239+
240+
const controller = stream._controller;
241+
const cancelPromise = controller._cancelAlgorithm(reason);
242+
TransformStreamDefaultControllerClearAlgorithms(controller);
243+
244+
return cancelPromise;
228245
}
229246

230247
function TransformStreamDefaultSinkCloseAlgorithm(stream) {
@@ -264,3 +281,21 @@ function TransformStreamDefaultSourcePullAlgorithm(stream) {
264281
// Prevent the next pull() call until there is backpressure.
265282
return stream._backpressureChangePromise;
266283
}
284+
285+
function TransformStreamDefaultSourceCancelAlgorithm(stream, reason) {
286+
verbose('TransformStreamDefaultSourceCancelAlgorithm()');
287+
288+
// stream._writable cannot change after construction, so caching it across a call to user code is safe.
289+
const writable = stream._writable;
290+
if (writable._state !== 'writable') {
291+
return promiseResolvedWith(undefined);
292+
}
293+
WritableStreamDefaultControllerErrorIfNeeded(writable._controller, reason);
294+
TransformStreamUnblockWrite(stream);
295+
296+
const controller = stream._controller;
297+
const cancelPromise = controller._cancelAlgorithm(reason);
298+
TransformStreamDefaultControllerClearAlgorithms(controller);
299+
300+
return cancelPromise;
301+
}

0 commit comments

Comments
 (0)