Skip to content

Implement GH-8641: [Stream] STREAM_NOTIFY_COMPLETED over HTTP never emitted #10505

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions ext/standard/tests/http/gh8641.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
--TEST--
GH-8641 ([Stream] STREAM_NOTIFY_COMPLETED over HTTP never emitted)
--SKIPIF--
<?php require 'server.inc'; http_server_skipif(); ?>
--INI--
allow_url_fopen=1
--FILE--
<?php
require 'server.inc';

function stream_notification_callback($notification_code, $severity, $message, $message_code, $bytes_transferred, $bytes_max)
{
if ($notification_code === STREAM_NOTIFY_COMPLETED) {
echo $notification_code, ' ', $bytes_transferred, ' ', $bytes_max, PHP_EOL;
}
}

$ctx = stream_context_create();
stream_context_set_params($ctx, array("notification" => "stream_notification_callback"));

$responses = array(
"data://text/plain,HTTP/1.0 200 Ok\r\nContent-Length: 11\r\n\r\nHello world",
);

['pid' => $pid, 'uri' => $uri] = http_server($responses, $output);

$f = file_get_contents($uri, 0, $ctx);

http_server_kill($pid);
var_dump($f);
?>
--EXPECTF--
8 11 11
string(11) "Hello world"
4 changes: 4 additions & 0 deletions main/streams/php_stream_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ END_EXTERN_C()
php_stream_notification_notify((context), PHP_STREAM_NOTIFY_PROGRESS, PHP_STREAM_NOTIFY_SEVERITY_INFO, \
NULL, 0, (bsofar), (bmax), NULL); } } while(0)

#define php_stream_notify_completed(context) do { if ((context) && (context)->notifier) { \
php_stream_notification_notify((context), PHP_STREAM_NOTIFY_COMPLETED, PHP_STREAM_NOTIFY_SEVERITY_INFO, \
NULL, 0, (context)->notifier->progress, (context)->notifier->progress_max, NULL); } } while(0)

#define php_stream_notify_progress_init(context, sofar, bmax) do { if ((context) && (context)->notifier) { \
(context)->notifier->progress = (sofar); \
(context)->notifier->progress_max = (bmax); \
Expand Down
39 changes: 32 additions & 7 deletions main/streams/streams.c
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,9 @@ PHPAPI zend_result _php_stream_fill_read_buffer(php_stream *stream, size_t size)
{
/* allocate/fill the buffer */

zend_result retval;
bool old_eof = stream->eof;

if (stream->readfilters.head) {
size_t to_read_now = MIN(size, stream->chunk_size);
char *chunk_buf;
Expand All @@ -562,7 +565,8 @@ PHPAPI zend_result _php_stream_fill_read_buffer(php_stream *stream, size_t size)
justread = stream->ops->read(stream, chunk_buf, stream->chunk_size);
if (justread < 0 && stream->writepos == stream->readpos) {
efree(chunk_buf);
return FAILURE;
retval = FAILURE;
goto out_check_eof;
} else if (justread > 0) {
bucket = php_stream_bucket_new(stream, chunk_buf, justread, 0, 0);

Expand Down Expand Up @@ -633,7 +637,8 @@ PHPAPI zend_result _php_stream_fill_read_buffer(php_stream *stream, size_t size)
* further reads should fail. */
stream->eof = 1;
efree(chunk_buf);
return FAILURE;
retval = FAILURE;
goto out_is_eof;
}

if (justread <= 0) {
Expand All @@ -643,7 +648,6 @@ PHPAPI zend_result _php_stream_fill_read_buffer(php_stream *stream, size_t size)

efree(chunk_buf);
return SUCCESS;

} else {
/* is there enough data in the buffer ? */
if (stream->writepos - stream->readpos < (zend_off_t)size) {
Expand All @@ -670,12 +674,22 @@ PHPAPI zend_result _php_stream_fill_read_buffer(php_stream *stream, size_t size)
stream->readbuflen - stream->writepos
);
if (justread < 0) {
return FAILURE;
retval = FAILURE;
goto out_check_eof;
}
stream->writepos += justread;
retval = SUCCESS;
goto out_check_eof;
}
return SUCCESS;
}

out_check_eof:
if (old_eof != stream->eof) {
out_is_eof:
php_stream_notify_completed(PHP_STREAM_CONTEXT(stream));
}
return retval;
}

PHPAPI ssize_t _php_stream_read(php_stream *stream, char *buf, size_t size)
Expand Down Expand Up @@ -1124,6 +1138,7 @@ PHPAPI zend_string *php_stream_get_record(php_stream *stream, size_t maxlen, con
static ssize_t _php_stream_write_buffer(php_stream *stream, const char *buf, size_t count)
{
ssize_t didwrite = 0;
ssize_t retval;

/* if we have a seekable stream we need to ensure that data is written at the
* current stream->position. This means invalidating the read buffer and then
Expand All @@ -1134,15 +1149,19 @@ static ssize_t _php_stream_write_buffer(php_stream *stream, const char *buf, siz
stream->ops->seek(stream, stream->position, SEEK_SET, &stream->position);
}

bool old_eof = stream->eof;

while (count > 0) {
ssize_t justwrote = stream->ops->write(stream, buf, count);
if (justwrote <= 0) {
/* If we already successfully wrote some bytes and a write error occurred
* later, report the successfully written bytes. */
if (didwrite == 0) {
return justwrote;
retval = justwrote;
goto out;
}
return didwrite;
retval = didwrite;
goto out;
}

buf += justwrote;
Expand All @@ -1151,7 +1170,13 @@ static ssize_t _php_stream_write_buffer(php_stream *stream, const char *buf, siz
stream->position += justwrote;
}

return didwrite;
retval = didwrite;

out:
if (old_eof != stream->eof) {
php_stream_notify_completed(PHP_STREAM_CONTEXT(stream));
}
return retval;
}

/* push some data through the write filter chain.
Expand Down