Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 60 additions & 49 deletions src/waltz/http/fd_http_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1067,75 +1067,86 @@ write_conn_ws( fd_http_server_t * http,
if( FD_UNLIKELY( maybe_write_pong( http, conn_idx ) ) ) return;
if( FD_UNLIKELY( !conn->send_frame_cnt ) ) return;

fd_http_server_ws_frame_t * frame = &conn->send_frames[ conn->send_frame_idx ];
switch( conn->send_frame_state ) {
case FD_HTTP_SERVER_SEND_FRAME_STATE_HEADER: {
uchar header[ 10 ];
struct iovec iovecs[ 512UL*2UL ];
uchar headers[ 512UL ][ 10UL ];

ulong batch_cnt = fd_ulong_min( conn->send_frame_cnt, 512UL );
ulong out_idx = 0UL;
for( ulong i=0UL; i<batch_cnt; i++ ) {
fd_http_server_ws_frame_t * frame = &conn->send_frames[ (conn->send_frame_idx+1UL) % http->max_ws_send_frame_cnt ];
if( FD_UNLIKELY( i || conn->send_frame_state==FD_HTTP_SERVER_SEND_FRAME_STATE_HEADER ) ) {
ulong header_len;
header[ 0 ] = 0x80 | fd_uchar_if(frame->compressed, 0x02, 0x01); /* FIN, 0x1 for text, 0x2 for binary */
headers[ i ][ 0 ] = 0x80 | fd_uchar_if(frame->compressed, 0x02, 0x01); /* FIN, 0x1 for text, 0x2 for binary */
if( FD_LIKELY( frame->len<126UL ) ) {
header[ 1 ] = (uchar)frame->len;
headers[ i ][ 1 ] = (uchar)frame->len;
header_len = 2UL;
} else if( FD_LIKELY( frame->len<65536UL ) ) {
header[ 1 ] = 126;
header[ 2 ] = (uchar)(frame->len>>8);
header[ 3 ] = (uchar)(frame->len);
headers[ i ][ 1 ] = 126;
headers[ i ][ 2 ] = (uchar)(frame->len>>8);
headers[ i ][ 3 ] = (uchar)(frame->len);
header_len = 4UL;
} else {
header[ 1 ] = 127;
header[ 2 ] = (uchar)(frame->len>>56);
header[ 3 ] = (uchar)(frame->len>>48);
header[ 4 ] = (uchar)(frame->len>>40);
header[ 5 ] = (uchar)(frame->len>>32);
header[ 6 ] = (uchar)(frame->len>>24);
header[ 7 ] = (uchar)(frame->len>>16);
header[ 8 ] = (uchar)(frame->len>>8);
header[ 9 ] = (uchar)(frame->len);
headers[ i ][ 1 ] = 127;
headers[ i ][ 2 ] = (uchar)(frame->len>>56);
headers[ i ][ 3 ] = (uchar)(frame->len>>48);
headers[ i ][ 4 ] = (uchar)(frame->len>>40);
headers[ i ][ 5 ] = (uchar)(frame->len>>32);
headers[ i ][ 6 ] = (uchar)(frame->len>>24);
headers[ i ][ 7 ] = (uchar)(frame->len>>16);
headers[ i ][ 8 ] = (uchar)(frame->len>>8);
headers[ i ][ 9 ] = (uchar)(frame->len);
header_len = 10UL;
}

long sz = send( http->pollfds[ conn_idx ].fd, header+conn->send_frame_bytes_written, header_len-conn->send_frame_bytes_written, MSG_NOSIGNAL );
if( FD_UNLIKELY( -1==sz && errno==EAGAIN ) ) return; /* No data was written, continue. */
else if( FD_UNLIKELY( -1==sz && is_expected_network_error( errno ) ) ) {
close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_PEER_RESET );
return;
}
else if( FD_UNLIKELY( -1==sz ) ) FD_LOG_ERR(( "write failed (%i-%s)", errno, strerror( errno ) )); /* Unexpected programmer error, abort */
ulong header_bytes_written = fd_ulong_if( i==0UL, conn->send_frame_bytes_written, 0UL );

http->metrics.bytes_written += (ulong)sz;
conn->send_frame_bytes_written += (ulong)sz;
if( FD_UNLIKELY( conn->send_frame_bytes_written==header_len ) ) {
conn->send_frame_state = FD_HTTP_SERVER_SEND_FRAME_STATE_DATA;
conn->send_frame_bytes_written = 0UL;
}
break;
iovecs[ out_idx ].iov_base = headers[ i ]+header_bytes_written;
iovecs[ out_idx ].iov_len = header_len-header_bytes_written;
out_idx++;
}
case FD_HTTP_SERVER_SEND_FRAME_STATE_DATA: {
/* frame->off can point to either the compressed or uncompressed region */
uchar const * data = http->oring+(frame->off%http->oring_sz)+conn->send_frame_bytes_written;
ulong data_sz = frame->len-conn->send_frame_bytes_written;

long sz = send( http->pollfds[ conn_idx ].fd, data, data_sz, MSG_NOSIGNAL );
if( FD_UNLIKELY( -1==sz && errno==EAGAIN ) ) return; /* No data was written, continue. */
else if( FD_UNLIKELY( -1==sz && is_expected_network_error( errno ) ) ) {
close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_PEER_RESET );
return;
}
else if( FD_UNLIKELY( -1==sz ) ) FD_LOG_ERR(( "write failed (%i-%s)", errno, strerror( errno ) )); /* Unexpected programmer error, abort */

http->metrics.bytes_written += (ulong)sz;
conn->send_frame_bytes_written += (ulong)sz;
if( FD_UNLIKELY( conn->send_frame_bytes_written==frame->len ) ) {
ulong data_bytes_written = fd_ulong_if( i==0UL && conn->send_frame_state==FD_HTTP_SERVER_SEND_FRAME_STATE_DATA, conn->send_frame_bytes_written, 0UL );
iovecs[ out_idx ].iov_base = http->oring+(frame->off%http->oring_sz)+data_bytes_written;
iovecs[ out_idx ].iov_len = frame->len-data_bytes_written;
out_idx++;
}

struct mmsghdr msg = {0};
msg.msg_hdr.msg_iov = iovecs;
msg.msg_hdr.msg_iovlen = out_idx;

int result = sendmmsg( http->pollfds[ conn_idx ].fd, &msg, 1U, MSG_NOSIGNAL );
if( FD_UNLIKELY( -1==result && errno==EAGAIN ) ) return; /* No data was written, continue. */
else if( FD_UNLIKELY( -1==result && is_expected_network_error( errno ) ) ) {
close_conn( http, conn_idx, FD_HTTP_SERVER_CONNECTION_CLOSE_PEER_RESET );
return;
}
else if( FD_UNLIKELY( -1==result ) ) FD_LOG_ERR(( "write failed (%i-%s)", errno, fd_io_strerror( errno ) )); /* Unexpected programmer error, abort */

FD_TEST( result==1 );

ulong sent = (ulong)msg.msg_len;
http->metrics.bytes_written += sent;

for( ulong i=0UL; i<out_idx; i++ ) {
ulong iov_len = iovecs[ i ].iov_len;
if( FD_LIKELY( sent>=iov_len ) ) {
if( FD_LIKELY( conn->send_frame_state==FD_HTTP_SERVER_SEND_FRAME_STATE_DATA ) ) {
conn->send_frame_state = FD_HTTP_SERVER_SEND_FRAME_STATE_HEADER;
conn->send_frame_idx = (conn->send_frame_idx+1UL) % http->max_ws_send_frame_cnt;
conn->send_frame_cnt--;
conn->send_frame_bytes_written = 0UL;

ws_conn_treap_ele_remove( http->ws_conn_treap, conn, http->ws_conns );
if( FD_LIKELY( conn->send_frame_cnt ) ) ws_conn_treap_ele_insert( http->ws_conn_treap, conn, http->ws_conns );

http->metrics.frames_written++;
} else {
conn->send_frame_state = FD_HTTP_SERVER_SEND_FRAME_STATE_DATA;
}

sent -= iov_len;
} else {
conn->send_frame_bytes_written = sent;
break;
}
}
Expand All @@ -1160,7 +1171,7 @@ fd_http_server_poll( fd_http_server_t * http,
for( ulong i=0UL; i<http->max_conns+http->max_ws_conns+1UL; i++ ) {
if( FD_UNLIKELY( -1==http->pollfds[ i ].fd ) ) continue;
if( FD_UNLIKELY( i==http->max_conns+http->max_ws_conns ) ) {
accept_conns( http );
if( FD_LIKELY( http->pollfds[ i ].revents & POLLIN ) ) accept_conns( http );
} else {
if( FD_LIKELY( http->pollfds[ i ].revents & POLLIN ) ) read_conn( http, i );
if( FD_UNLIKELY( -1==http->pollfds[ i ].fd ) ) continue;
Expand Down
Loading