13
13
14
14
use Amp \ByteStream \StreamException ;
15
15
use Amp \DeferredCancellation ;
16
+ use Amp \DeferredFuture ;
17
+ use Amp \Future ;
16
18
use Amp \Http \Client \HttpException ;
17
19
use Amp \Http \Client \Request ;
18
20
use Amp \Http \Client \Response ;
19
21
use Psr \Log \LoggerInterface ;
20
- use Revolt \EventLoop ;
21
- use Revolt \EventLoop \Suspension ;
22
22
use Symfony \Component \HttpClient \Chunk \FirstChunk ;
23
23
use Symfony \Component \HttpClient \Chunk \InformationalChunk ;
24
24
use Symfony \Component \HttpClient \Exception \InvalidArgumentException ;
@@ -196,7 +196,7 @@ private static function generateResponse(Request $request, AmpClientState $multi
196
196
});
197
197
198
198
try {
199
- if (null === $ response = self ::getPushedResponse ($ request , $ multi , $ info , $ headers , $ options , $ logger )) {
199
+ if (null === $ response = self ::getPushedResponse ($ request , $ multi , $ info , $ headers , $ canceller , $ options , $ logger )) {
200
200
$ logger ?->info(sprintf ('Request: "%s %s" ' , $ info ['http_method ' ], $ info ['url ' ]));
201
201
202
202
$ response = self ::followRedirects ($ request , $ multi , $ info , $ headers , $ canceller , $ options , $ onProgress , $ handle , $ logger , $ pause );
@@ -364,15 +364,16 @@ private static function addResponseHeaders(Response $response, array &$info, arr
364
364
/**
365
365
* Accepts pushed responses only if their headers related to authentication match the request.
366
366
*/
367
- private static function getPushedResponse (Request $ request , AmpClientState $ multi , array &$ info , array &$ headers , array $ options , ?LoggerInterface $ logger ): ?Response
367
+ private static function getPushedResponse (Request $ request , AmpClientState $ multi , array &$ info , array &$ headers , DeferredCancellation $ canceller , array $ options , ?LoggerInterface $ logger ): ?Response
368
368
{
369
369
if ('' !== $ options ['body ' ]) {
370
370
return null ;
371
371
}
372
372
373
373
$ authority = $ request ->getUri ()->getAuthority ();
374
+ $ cancellation = $ canceller ->getCancellation ();
374
375
375
- foreach ($ multi ->pushedResponses [$ authority ] ?? [] as $ i => [$ pushedUrl , $ pushedResponse , $ pushedRequest , $ parentOptions ]) {
376
+ foreach ($ multi ->pushedResponses [$ authority ] ?? [] as $ i => [$ pushedUrl , $ pushDeferred , $ pushedRequest, $ pushedResponse , $ parentOptions ]) {
376
377
if ($ info ['url ' ] !== $ pushedUrl || $ info ['http_method ' ] !== $ pushedRequest ->getMethod ()) {
377
378
continue ;
378
379
}
@@ -383,13 +384,23 @@ private static function getPushedResponse(Request $request, AmpClientState $mult
383
384
}
384
385
}
385
386
387
+ /** @var DeferredFuture $pushDeferred */
388
+ $ id = $ cancellation ->subscribe (static fn ($ e ) => $ pushDeferred ->error ($ e ));
389
+
390
+ try {
391
+ /** @var Future $pushedResponse */
392
+ $ response = $ pushedResponse ->await ($ cancellation );
393
+ } finally {
394
+ $ cancellation ->unsubscribe ($ id );
395
+ }
396
+
386
397
foreach (['authorization ' , 'cookie ' , 'range ' , 'proxy-authorization ' ] as $ k ) {
387
- if ($ pushedRequest ->getHeaderArray ($ k ) !== $ request ->getHeaderArray ($ k )) {
398
+ if ($ response ->getHeaderArray ($ k ) !== $ request ->getHeaderArray ($ k )) {
388
399
continue 2 ;
389
400
}
390
401
}
391
402
392
- foreach ($ pushedResponse ->getHeaderArray ('vary ' ) as $ vary ) {
403
+ foreach ($ response ->getHeaderArray ('vary ' ) as $ vary ) {
393
404
foreach (preg_split ('/\s*+,\s*+/ ' , $ vary ) as $ v ) {
394
405
if ('* ' === $ v || ($ pushedRequest ->getHeaderArray ($ v ) !== $ request ->getHeaderArray ($ v ) && 'accept-encoding ' !== strtolower ($ v ))) {
395
406
$ logger ?->debug(sprintf ('Skipping pushed response: "%s" ' , $ info ['url ' ]));
@@ -398,8 +409,9 @@ private static function getPushedResponse(Request $request, AmpClientState $mult
398
409
}
399
410
}
400
411
412
+ $ pushDeferred ->complete ();
401
413
$ logger ?->debug(sprintf ('Accepting pushed response: "%s %s" ' , $ info ['http_method ' ], $ info ['url ' ]));
402
- self ::addResponseHeaders ($ pushedResponse , $ info , $ headers );
414
+ self ::addResponseHeaders ($ response , $ info , $ headers );
403
415
unset($ multi ->pushedResponses [$ authority ][$ i ]);
404
416
405
417
if (!$ multi ->pushedResponses [$ authority ]) {
0 commit comments