From 38ee90a0ef4f6d5c86beeaf61bdfef50967241a7 Mon Sep 17 00:00:00 2001 From: Niels Dossche <7771979+nielsdos@users.noreply.github.com> Date: Sat, 4 Feb 2023 02:12:49 +0100 Subject: [PATCH 1/3] Implement GH-8641: [Stream] STREAM_NOTIFY_COMPLETED over HTTP never emitted This adds support for the completed event. Since the read handler could be entered twice towards the end of the stream we remember what the eof flag was before reading so we can emit the completed event when the flag changes to true. --- ext/openssl/xp_ssl.c | 4 ++++ ext/standard/tests/http/gh8641.phpt | 34 +++++++++++++++++++++++++++++ main/streams/php_stream_context.h | 4 ++++ main/streams/xp_socket.c | 7 +++++- 4 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 ext/standard/tests/http/gh8641.phpt diff --git a/ext/openssl/xp_ssl.c b/ext/openssl/xp_ssl.c index 7b604be043a5c..2b6141008c6af 100644 --- a/ext/openssl/xp_ssl.c +++ b/ext/openssl/xp_ssl.c @@ -2035,6 +2035,7 @@ static ssize_t php_openssl_sockop_io(int read, php_stream *stream, char *buf, si } /* Main IO loop. */ + bool old_eof = stream->eof; do { struct timeval cur_time, elapsed_time, left_time; @@ -2143,6 +2144,9 @@ static ssize_t php_openssl_sockop_io(int read, php_stream *stream, char *buf, si if (nr_bytes > 0) { php_stream_notify_progress_increment(PHP_STREAM_CONTEXT(stream), nr_bytes, 0); } + if (old_eof != stream->eof) { + php_stream_notify_completed(PHP_STREAM_CONTEXT(stream)); + } /* And if we were originally supposed to be blocking, let's reset the socket to that. */ if (began_blocked && php_set_sock_blocking(sslsock->s.socket, 1) == SUCCESS) { diff --git a/ext/standard/tests/http/gh8641.phpt b/ext/standard/tests/http/gh8641.phpt new file mode 100644 index 0000000000000..9ccedc443dfec --- /dev/null +++ b/ext/standard/tests/http/gh8641.phpt @@ -0,0 +1,34 @@ +--TEST-- +GH-8641 ([Stream] STREAM_NOTIFY_COMPLETED over HTTP never emitted) +--SKIPIF-- + +--INI-- +allow_url_fopen=1 +--FILE-- + "stream_notification_callback")); + +$responses = array( + "data://text/plain,HTTP/1.0 200 Ok\r\nContent-Length: 11\r\n\r\nHello world", +); + +['pid' => $pid, 'uri' => $uri] = http_server($responses, $output); + +$f = file_get_contents($uri, 0, $ctx); + +http_server_kill($pid); +var_dump($f); +?> +--EXPECTF-- +8 11 11 +string(11) "Hello world" diff --git a/main/streams/php_stream_context.h b/main/streams/php_stream_context.h index c98f5420ac3e6..d4ebe29bc162e 100644 --- a/main/streams/php_stream_context.h +++ b/main/streams/php_stream_context.h @@ -94,6 +94,10 @@ END_EXTERN_C() php_stream_notification_notify((context), PHP_STREAM_NOTIFY_PROGRESS, PHP_STREAM_NOTIFY_SEVERITY_INFO, \ NULL, 0, (bsofar), (bmax), NULL); } } while(0) +#define php_stream_notify_completed(context) do { if ((context) && (context)->notifier) { \ + php_stream_notification_notify((context), PHP_STREAM_NOTIFY_COMPLETED, PHP_STREAM_NOTIFY_SEVERITY_INFO, \ + NULL, 0, (context)->notifier->progress, (context)->notifier->progress_max, NULL); } } while(0) + #define php_stream_notify_progress_init(context, sofar, bmax) do { if ((context) && (context)->notifier) { \ (context)->notifier->progress = (sofar); \ (context)->notifier->progress_max = (bmax); \ diff --git a/main/streams/xp_socket.c b/main/streams/xp_socket.c index 4ea0dc8e880bf..fb70fa3744446 100644 --- a/main/streams/xp_socket.c +++ b/main/streams/xp_socket.c @@ -189,9 +189,11 @@ static ssize_t php_sockop_read(php_stream *stream, char *buf, size_t count) } ssize_t nr_bytes = recv(sock->socket, buf, XP_SOCK_BUF_SIZE(count), recv_flags); - int err = php_socket_errno(); + + bool old_eof = stream->eof; if (nr_bytes < 0) { + int err = php_socket_errno(); if (PHP_IS_TRANSIENT_ERROR(err)) { nr_bytes = 0; } else { @@ -204,6 +206,9 @@ static ssize_t php_sockop_read(php_stream *stream, char *buf, size_t count) if (nr_bytes > 0) { php_stream_notify_progress_increment(PHP_STREAM_CONTEXT(stream), nr_bytes, 0); } + if (old_eof != stream->eof) { + php_stream_notify_completed(PHP_STREAM_CONTEXT(stream)); + } return nr_bytes; } From c0ba29c16b04d73b90a5ea2237f29113d100fabc Mon Sep 17 00:00:00 2001 From: nielsdos <7771979+nielsdos@users.noreply.github.com> Date: Mon, 8 May 2023 20:30:56 +0200 Subject: [PATCH 2/3] Address review comments --- ext/openssl/xp_ssl.c | 4 ---- main/streams/streams.c | 39 ++++++++++++++++++++++++++++++--------- main/streams/xp_socket.c | 7 +------ 3 files changed, 31 insertions(+), 19 deletions(-) diff --git a/ext/openssl/xp_ssl.c b/ext/openssl/xp_ssl.c index 2b6141008c6af..7b604be043a5c 100644 --- a/ext/openssl/xp_ssl.c +++ b/ext/openssl/xp_ssl.c @@ -2035,7 +2035,6 @@ static ssize_t php_openssl_sockop_io(int read, php_stream *stream, char *buf, si } /* Main IO loop. */ - bool old_eof = stream->eof; do { struct timeval cur_time, elapsed_time, left_time; @@ -2144,9 +2143,6 @@ static ssize_t php_openssl_sockop_io(int read, php_stream *stream, char *buf, si if (nr_bytes > 0) { php_stream_notify_progress_increment(PHP_STREAM_CONTEXT(stream), nr_bytes, 0); } - if (old_eof != stream->eof) { - php_stream_notify_completed(PHP_STREAM_CONTEXT(stream)); - } /* And if we were originally supposed to be blocking, let's reset the socket to that. */ if (began_blocked && php_set_sock_blocking(sslsock->s.socket, 1) == SUCCESS) { diff --git a/main/streams/streams.c b/main/streams/streams.c index f655faef10cbf..dfaba3afba6f4 100644 --- a/main/streams/streams.c +++ b/main/streams/streams.c @@ -542,6 +542,9 @@ PHPAPI zend_result _php_stream_fill_read_buffer(php_stream *stream, size_t size) { /* allocate/fill the buffer */ + zend_result retval; + bool old_eof = stream->eof; + if (stream->readfilters.head) { size_t to_read_now = MIN(size, stream->chunk_size); char *chunk_buf; @@ -562,7 +565,8 @@ PHPAPI zend_result _php_stream_fill_read_buffer(php_stream *stream, size_t size) justread = stream->ops->read(stream, chunk_buf, stream->chunk_size); if (justread < 0 && stream->writepos == stream->readpos) { efree(chunk_buf); - return FAILURE; + retval = FAILURE; + goto out; } else if (justread > 0) { bucket = php_stream_bucket_new(stream, chunk_buf, justread, 0, 0); @@ -633,7 +637,8 @@ PHPAPI zend_result _php_stream_fill_read_buffer(php_stream *stream, size_t size) * further reads should fail. */ stream->eof = 1; efree(chunk_buf); - return FAILURE; + retval = FAILURE; + goto out; } if (justread <= 0) { @@ -642,8 +647,7 @@ PHPAPI zend_result _php_stream_fill_read_buffer(php_stream *stream, size_t size) } efree(chunk_buf); - return SUCCESS; - + retval = SUCCESS; } else { /* is there enough data in the buffer ? */ if (stream->writepos - stream->readpos < (zend_off_t)size) { @@ -670,12 +674,19 @@ PHPAPI zend_result _php_stream_fill_read_buffer(php_stream *stream, size_t size) stream->readbuflen - stream->writepos ); if (justread < 0) { - return FAILURE; + retval = FAILURE; + goto out; } stream->writepos += justread; } - return SUCCESS; + retval = SUCCESS; + } + +out: + if (old_eof != stream->eof) { + php_stream_notify_completed(PHP_STREAM_CONTEXT(stream)); } + return retval; } PHPAPI ssize_t _php_stream_read(php_stream *stream, char *buf, size_t size) @@ -1124,6 +1135,8 @@ PHPAPI zend_string *php_stream_get_record(php_stream *stream, size_t maxlen, con static ssize_t _php_stream_write_buffer(php_stream *stream, const char *buf, size_t count) { ssize_t didwrite = 0; + ssize_t retval; + bool old_eof = stream->eof; /* if we have a seekable stream we need to ensure that data is written at the * current stream->position. This means invalidating the read buffer and then @@ -1140,9 +1153,11 @@ static ssize_t _php_stream_write_buffer(php_stream *stream, const char *buf, siz /* If we already successfully wrote some bytes and a write error occurred * later, report the successfully written bytes. */ if (didwrite == 0) { - return justwrote; + retval = justwrote; + goto out; } - return didwrite; + retval = didwrite; + goto out; } buf += justwrote; @@ -1151,7 +1166,13 @@ static ssize_t _php_stream_write_buffer(php_stream *stream, const char *buf, siz stream->position += justwrote; } - return didwrite; + retval = didwrite; + +out: + if (old_eof != stream->eof) { + php_stream_notify_completed(PHP_STREAM_CONTEXT(stream)); + } + return retval; } /* push some data through the write filter chain. diff --git a/main/streams/xp_socket.c b/main/streams/xp_socket.c index fb70fa3744446..4ea0dc8e880bf 100644 --- a/main/streams/xp_socket.c +++ b/main/streams/xp_socket.c @@ -189,11 +189,9 @@ static ssize_t php_sockop_read(php_stream *stream, char *buf, size_t count) } ssize_t nr_bytes = recv(sock->socket, buf, XP_SOCK_BUF_SIZE(count), recv_flags); - - bool old_eof = stream->eof; + int err = php_socket_errno(); if (nr_bytes < 0) { - int err = php_socket_errno(); if (PHP_IS_TRANSIENT_ERROR(err)) { nr_bytes = 0; } else { @@ -206,9 +204,6 @@ static ssize_t php_sockop_read(php_stream *stream, char *buf, size_t count) if (nr_bytes > 0) { php_stream_notify_progress_increment(PHP_STREAM_CONTEXT(stream), nr_bytes, 0); } - if (old_eof != stream->eof) { - php_stream_notify_completed(PHP_STREAM_CONTEXT(stream)); - } return nr_bytes; } From d0834fc2e8b80130675fb4a27b596b4538ba0ee2 Mon Sep 17 00:00:00 2001 From: nielsdos <7771979+nielsdos@users.noreply.github.com> Date: Sat, 13 May 2023 15:51:44 +0200 Subject: [PATCH 3/3] Address review comments --- main/streams/streams.c | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/main/streams/streams.c b/main/streams/streams.c index dfaba3afba6f4..aa0e6f2952061 100644 --- a/main/streams/streams.c +++ b/main/streams/streams.c @@ -566,7 +566,7 @@ PHPAPI zend_result _php_stream_fill_read_buffer(php_stream *stream, size_t size) if (justread < 0 && stream->writepos == stream->readpos) { efree(chunk_buf); retval = FAILURE; - goto out; + goto out_check_eof; } else if (justread > 0) { bucket = php_stream_bucket_new(stream, chunk_buf, justread, 0, 0); @@ -638,7 +638,7 @@ PHPAPI zend_result _php_stream_fill_read_buffer(php_stream *stream, size_t size) stream->eof = 1; efree(chunk_buf); retval = FAILURE; - goto out; + goto out_is_eof; } if (justread <= 0) { @@ -647,7 +647,7 @@ PHPAPI zend_result _php_stream_fill_read_buffer(php_stream *stream, size_t size) } efree(chunk_buf); - retval = SUCCESS; + return SUCCESS; } else { /* is there enough data in the buffer ? */ if (stream->writepos - stream->readpos < (zend_off_t)size) { @@ -675,15 +675,18 @@ PHPAPI zend_result _php_stream_fill_read_buffer(php_stream *stream, size_t size) ); if (justread < 0) { retval = FAILURE; - goto out; + goto out_check_eof; } stream->writepos += justread; + retval = SUCCESS; + goto out_check_eof; } - retval = SUCCESS; + return SUCCESS; } -out: +out_check_eof: if (old_eof != stream->eof) { +out_is_eof: php_stream_notify_completed(PHP_STREAM_CONTEXT(stream)); } return retval; @@ -1136,7 +1139,6 @@ static ssize_t _php_stream_write_buffer(php_stream *stream, const char *buf, siz { ssize_t didwrite = 0; ssize_t retval; - bool old_eof = stream->eof; /* if we have a seekable stream we need to ensure that data is written at the * current stream->position. This means invalidating the read buffer and then @@ -1147,6 +1149,8 @@ static ssize_t _php_stream_write_buffer(php_stream *stream, const char *buf, siz stream->ops->seek(stream, stream->position, SEEK_SET, &stream->position); } + bool old_eof = stream->eof; + while (count > 0) { ssize_t justwrote = stream->ops->write(stream, buf, count); if (justwrote <= 0) {