Skip to content

Commit 1b64f2e

Browse files
committed
Remove uses of at-async in favor of at-spawn. at-async tends to "poison" cooperating
tasks to be sticky to the thread that is running them or the parent's task's thread.
1 parent 37c6adc commit 1b64f2e

File tree

16 files changed

+98
-145
lines changed

16 files changed

+98
-145
lines changed

.github/workflows/ci.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ jobs:
3434
- os: windows-latest
3535
version: '1'
3636
arch: x86
37+
exclude:
38+
- os: windows-latest
39+
version: '1.6'
3740
steps:
3841
- uses: actions/checkout@v3
3942
- uses: julia-actions/setup-julia@v1
@@ -55,6 +58,7 @@ jobs:
5558
env:
5659
PIE_SOCKET_API_KEY: ${{ secrets.PIE_SOCKET_API_KEY }}
5760
JULIA_VERSION: ${{ matrix.version }}
61+
JULIA_NUM_THREADS: 2
5862
- uses: julia-actions/julia-processcoverage@v1
5963
- uses: codecov/codecov-action@v1
6064
with:

Project.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
name = "HTTP"
22
uuid = "cd3eb016-35fb-5094-929b-558a96fad6f3"
33
authors = ["Jacob Quinn", "contributors: https://github.com/JuliaWeb/HTTP.jl/graphs/contributors"]
4-
version = "1.8.0"
4+
version = "1.9.0"
55

66
[deps]
77
Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f"
@@ -21,7 +21,7 @@ UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
2121

2222
[compat]
2323
CodecZlib = "0.7"
24-
ConcurrentUtilities = "2.1"
24+
ConcurrentUtilities = "2.2"
2525
LoggingExtras = "0.4.9,1"
2626
MbedTLS = "0.6.8, 0.7, 1"
2727
OpenSSL = "1.3"

src/Connections.jl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ end
9595
Used for "hashing" a Connection object on just the key properties necessary for determining
9696
connection re-useability. That is, when a new request calls `newconnection`, we take the
9797
request parameters of host and port, and if ssl verification is required, if keepalive is enabled,
98-
and if an existing Connection was already created with the exact
98+
and if an existing Connection was already created with the exact.
9999
same parameters, we can re-use it (as long as it's not already being used, obviously).
100100
"""
101101
connectionkey(x::Connection) = (x.host, x.port, x.require_ssl_verification, x.keepalive, x.clientconnection)
@@ -299,7 +299,7 @@ function IOExtras.closeread(c::Connection)
299299
c.readable = false
300300
@debugv 3 "✉️ Read done: $c"
301301
if c.clientconnection
302-
t = @async monitor_idle_connection(c)
302+
t = Threads.@spawn monitor_idle_connection(c)
303303
@isdefined(errormonitor) && errormonitor(t)
304304
end
305305
return
@@ -424,7 +424,7 @@ function connection_isvalid(c, idle_timeout)
424424
end
425425

426426
@noinline connection_limit_warning(cl) = cl === nothing ||
427-
@warn "connection_limit no longer supported as a keyword argument; use `HTTP.set_default_connection_limit!($cl)` or pass a connection pool like `pool=HTTP.Pool($cl)` instead."
427+
@warn "connection_limit no longer supported as a keyword argument; use `HTTP.set_default_connection_limit!($cl)` before any requests are made or construct a shared pool via `POOL = HTTP.Pool($cl)` and pass to each request like `pool=POOL` instead."
428428

429429
"""
430430
newconnection(type, host, port) -> Connection
@@ -509,7 +509,7 @@ function getconnection(::Type{TCPSocket},
509509
tcp = Sockets.TCPSocket()
510510
Sockets.connect!(tcp, addr, p)
511511
try
512-
try_with_timeout(connect_timeout) do
512+
try_with_timeout(connect_timeout) do _
513513
Sockets.wait_connected(tcp)
514514
keepalive && keepalive!(tcp)
515515
end
@@ -523,7 +523,7 @@ function getconnection(::Type{TCPSocket},
523523
end
524524
return tcp
525525
catch e
526-
lasterr = e isa TimeoutError ? ConnectTimeout(host, port) : e
526+
lasterr = e isa ConcurrentUtilities.TimeoutException ? ConnectTimeout(host, port) : e
527527
end
528528
end
529529
# If no connetion could be set up, to any address, throw last error
@@ -624,7 +624,7 @@ function sslupgrade(::Type{IOType}, c::Connection{T},
624624
# if the upgrade fails, an error will be thrown and the original c will be closed
625625
# in ConnectionRequest
626626
tls = if readtimeout > 0
627-
try_with_timeout(readtimeout) do
627+
try_with_timeout(readtimeout) do _
628628
sslconnection(IOType, c.io, host; require_ssl_verification=require_ssl_verification, keepalive=keepalive, kw...)
629629
end
630630
else

src/Exceptions.jl

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module Exceptions
22

3-
export @try, try_with_timeout, HTTPError, ConnectError, TimeoutError, StatusError, RequestError
3+
export @try, HTTPError, ConnectError, TimeoutError, StatusError, RequestError, current_exceptions_to_string
44
using LoggingExtras
55
import ..HTTP # for doc references
66

@@ -25,24 +25,6 @@ macro $(:try)(exes...)
2525
end
2626
end # @eval
2727

28-
function try_with_timeout(f, timeout)
29-
ch = Channel(0)
30-
timer = Timer(tm -> close(ch, TimeoutError(timeout)), timeout)
31-
@async begin
32-
try
33-
put!(ch, $f())
34-
catch e
35-
if !(e isa HTTPError)
36-
e = CapturedException(e, catch_backtrace())
37-
end
38-
close(ch, e)
39-
finally
40-
close(timer)
41-
end
42-
end
43-
return take!(ch)
44-
end
45-
4628
abstract type HTTPError <: Exception end
4729

4830
"""
@@ -97,4 +79,12 @@ struct RequestError <: HTTPError
9779
error::Any
9880
end
9981

82+
function current_exceptions_to_string(curr_exc)
83+
buf = IOBuffer()
84+
println(buf)
85+
println(buf, "\n===========================\nHTTP Error message:\n")
86+
Base.showerror(buf, curr_exc)
87+
return String(take!(buf))
88+
end
89+
10090
end # module Exceptions

src/HTTP.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ const DEBUG_LEVEL = Ref(0)
77

88
Base.@deprecate escape escapeuri
99

10-
using Base64, Sockets, Dates, URIs, LoggingExtras, MbedTLS
10+
using Base64, Sockets, Dates, URIs, LoggingExtras, MbedTLS, OpenSSL
1111

1212
function access_threaded(f, v::Vector)
1313
tid = Threads.threadid()
@@ -24,7 +24,7 @@ end
2424

2525
function open end
2626

27-
const SOCKET_TYPE_TLS = Ref{Any}(MbedTLS.SSLContext)
27+
const SOCKET_TYPE_TLS = Ref{Any}(OpenSSL.SSLStream)
2828

2929
include("Conditions.jl") ;using .Conditions
3030
include("access_log.jl")

src/IOExtras.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ else
8484
end
8585

8686
tcpsocket(io::SSLContext)::TCPSocket = io.bio
87-
tcpsocket(io::SSLStream)::TCPSocket = io.bio_read_stream.io
87+
tcpsocket(io::SSLStream)::TCPSocket = io.io
8888
tcpsocket(io::TCPSocket)::TCPSocket = io
8989

9090
localport(io) = try !isopen(tcpsocket(io)) ? 0 :

src/Streams.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ IOExtras.isopen(http::Stream) = isopen(http.stream)
5656

5757
# Writing HTTP Messages
5858

59-
messagetowrite(http::Stream{<:Response}) = http.message.request
59+
messagetowrite(http::Stream{<:Response}) = http.message.request::Request
6060
messagetowrite(http::Stream{<:Request}) = http.message.response
6161

6262
IOExtras.iswritable(http::Stream) = iswritable(http.stream)
@@ -372,7 +372,7 @@ function IOExtras.closeread(http::Stream{<:Response})
372372
else
373373

374374
# Discard body bytes that were not read...
375-
while !eof(http)
375+
@try Base.IOError EOFError while !eof(http)
376376
readavailable(http)
377377
end
378378

src/clientlayers/ConnectionRequest.jl

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module ConnectionRequest
22

3-
using URIs, Sockets, Base64, LoggingExtras
3+
using URIs, Sockets, Base64, LoggingExtras, ConcurrentUtilities
44
using MbedTLS: SSLContext, SSLConfig
55
using OpenSSL: SSLStream
66
using ..Messages, ..IOExtras, ..Connections, ..Streams, ..Exceptions
@@ -55,7 +55,7 @@ Close the connection if the request throws an exception.
5555
Otherwise leave it open so that it can be reused.
5656
"""
5757
function connectionlayer(handler)
58-
return function connections(req; proxy=getproxy(req.url.scheme, req.url.host), socket_type::Type=TCPSocket, socket_type_tls::Type=SOCKET_TYPE_TLS[], readtimeout::Int=0, logerrors::Bool=false, kw...)
58+
return function connections(req; proxy=getproxy(req.url.scheme, req.url.host), socket_type::Type=TCPSocket, socket_type_tls::Type=SOCKET_TYPE_TLS[], readtimeout::Int=0, logerrors::Bool=false, logtag=nothing, kw...)
5959
local io, stream
6060
if proxy !== nothing
6161
target_url = req.url
@@ -79,7 +79,8 @@ function connectionlayer(handler)
7979
io = newconnection(IOType, url.host, url.port; readtimeout=readtimeout, kw...)
8080
catch e
8181
if logerrors
82-
@error "HTTP.ConnectError" exception=(e, catch_backtrace()) method=req.method url=req.url context=req.context
82+
err = current_exceptions_to_string(CapturedException(e, catch_backtrace()))
83+
@error err type=Symbol("HTTP.ConnectError") method=req.method url=req.url context=req.context logtag=logtag
8384
end
8485
req.context[:connect_errors] = get(req.context, :connect_errors, 0) + 1
8586
throw(ConnectError(string(url), e))
@@ -98,7 +99,7 @@ function connectionlayer(handler)
9899
target_url = URI(target_url, port=80) # if there is no port info, connect_tunnel will fail
99100
end
100101
r = if readtimeout > 0
101-
try_with_timeout(readtimeout) do
102+
try_with_timeout(readtimeout) do _
102103
connect_tunnel(io, target_url, req)
103104
end
104105
else
@@ -115,7 +116,7 @@ function connectionlayer(handler)
115116
end
116117

117118
stream = Stream(req.response, io)
118-
return handler(stream; readtimeout=readtimeout, logerrors=logerrors, kw...)
119+
return handler(stream; readtimeout=readtimeout, logerrors=logerrors, logtag=logtag, kw...)
119120
catch e
120121
while true
121122
if e isa CompositeException
@@ -126,8 +127,10 @@ function connectionlayer(handler)
126127
break
127128
end
128129
end
129-
if logerrors && !(e isa StatusError || e isa TimeoutError)
130-
@error "HTTP.ConnectionRequest" exception=(e, catch_backtrace()) method=req.method url=req.url context=req.context
130+
root_err = e isa CapturedException ? e.ex : e
131+
if logerrors && !(root_err isa StatusError || root_err isa Exceptions.TimeoutError || root_err isa Base.IOError)
132+
err = current_exceptions_to_string(e)
133+
@error err type=Symbol("HTTP.ConnectionRequest") method=req.method url=req.url context=req.context logtag=logtag
131134
end
132135
@debugv 1 "❗️ ConnectionLayer $e. Closing: $io"
133136
shouldreuse = false
@@ -136,7 +139,7 @@ function connectionlayer(handler)
136139
# idempotency of the request
137140
req.context[:nothingwritten] = true
138141
end
139-
e isa HTTPError || throw(RequestError(req, e))
142+
root_err isa HTTPError || throw(RequestError(req, e))
140143
rethrow(e)
141144
finally
142145
releaseconnection(io, shouldreuse; kw...)

src/clientlayers/ExceptionRequest.jl

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,15 @@ using ..IOExtras, ..Messages, ..Exceptions
1010
Throw a `StatusError` if the request returns an error response status.
1111
"""
1212
function exceptionlayer(handler)
13-
return function exceptions(stream; status_exception::Bool=true, logerrors::Bool=false, kw...)
14-
res = handler(stream; logerrors=logerrors, kw...)
13+
return function exceptions(stream; status_exception::Bool=true, timedout=nothing, logerrors::Bool=false, logtag=nothing, kw...)
14+
res = handler(stream; timedout=timedout, logerrors=logerrors, logtag=logtag, kw...)
1515
if status_exception && iserror(res)
1616
req = res.request
1717
req.context[:status_errors] = get(req.context, :status_errors, 0) + 1
1818
e = StatusError(res.status, req.method, req.target, res)
19-
if logerrors
20-
@error "HTTP.StatusError" exception=(e, catch_backtrace()) method=req.method url=req.url context=req.context
19+
if logerrors && (timedout === nothing || !timedout[])
20+
err = current_exceptions_to_string(CapturedException(e, catch_backtrace()))
21+
@error err type=Symbol("HTTP.StatusError") method=req.method url=req.url context=req.context logtag=logtag
2122
end
2223
throw(e)
2324
else

src/clientlayers/StreamRequest.jl

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ immediately so that the transmission can be aborted if the `Response` status
1717
indicates that the server does not wish to receive the message body.
1818
[RFC7230 6.5](https://tools.ietf.org/html/rfc7230#section-6.5).
1919
"""
20-
function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothing, Bool}=nothing, logerrors::Bool=false, kw...)::Response
20+
function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothing, Bool}=nothing, logerrors::Bool=false, logtag=nothing, timedout=nothing, kw...)::Response
2121
response = stream.message
2222
req = response.request
2323
@debugv 1 sprintcompact(req)
@@ -33,43 +33,45 @@ function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothi
3333
try
3434
@sync begin
3535
if iofunction === nothing
36-
@async try
36+
Threads.@spawn try
3737
writebody(stream, req)
38-
@debugv 2 "client closewrite"
39-
closewrite(stream)
4038
finally
4139
req.context[:write_duration_ms] = get(req.context, :write_duration_ms, 0.0) + ((time() - write_start) * 1000)
40+
@debugv 2 "client closewrite"
41+
closewrite(stream)
4242
end
4343
read_start = time()
44-
@async try
44+
Threads.@spawn try
4545
@debugv 2 "client startread"
4646
startread(stream)
47-
if isaborted(stream)
48-
# The server may have closed the connection.
49-
# Don't propagate such errors.
50-
@try Base.IOError close(stream.stream)
47+
if !isaborted(stream)
48+
readbody(stream, response, decompress)
5149
end
52-
readbody(stream, response, decompress)
5350
finally
5451
req.context[:read_duration_ms] = get(req.context, :read_duration_ms, 0.0) + ((time() - read_start) * 1000)
52+
@debugv 2 "client closeread"
53+
closeread(stream)
5554
end
5655
else
57-
iofunction(stream)
56+
try
57+
iofunction(stream)
58+
finally
59+
closewrite(stream)
60+
closeread(stream)
61+
end
5862
end
5963
end
6064
catch e
61-
if logerrors
62-
@error "HTTP.IOError" exception=(e, catch_backtrace()) method=req.method url=req.url context=req.context
65+
if timedout === nothing || !timedout[]
66+
req.context[:io_errors] = get(req.context, :io_errors, 0) + 1
67+
if logerrors
68+
err = current_exceptions_to_string(CapturedException(e, catch_backtrace()))
69+
@error err type=Symbol("HTTP.IOError") method=req.method url=req.url context=req.context logtag=logtag
70+
end
6371
end
64-
req.context[:io_errors] = get(req.context, :io_errors, 0) + 1
6572
rethrow()
6673
end
6774

68-
@debugv 2 "client closewrite"
69-
closewrite(stream)
70-
@debugv 2 "client closeread"
71-
closeread(stream)
72-
7375
@debugv 1 sprintcompact(response)
7476
@debugv 2 sprint(show, response)
7577
return response
@@ -150,7 +152,7 @@ function readbody!(stream::Stream, res::Response, buf_or_stream)
150152
# so using the default write is fastest because it utilizes
151153
# readavailable under the hood, for which BufferStream is optimized
152154
n = write(body, buf_or_stream)
153-
elseif buf_or_stream isa Stream
155+
elseif buf_or_stream isa Stream{Response}
154156
# for HTTP.Stream, there's already an optimized read method
155157
# that just needs an IOBuffer to write into
156158
n = readall!(buf_or_stream, body)
@@ -161,7 +163,7 @@ function readbody!(stream::Stream, res::Response, buf_or_stream)
161163
res.body = read(buf_or_stream)
162164
n = length(res.body)
163165
end
164-
elseif res.body isa Base.GenericIOBuffer && buf_or_stream isa Stream
166+
elseif res.body isa Base.GenericIOBuffer && buf_or_stream isa Stream{Response}
165167
# optimization for IOBuffer response_stream to avoid temporary allocations
166168
n = readall!(buf_or_stream, res.body)
167169
else

0 commit comments

Comments
 (0)