Skip to content

Commit 02ba345

Browse files
authored
Allow customizing retry logic for http requests (#974)
* Allow more custom retry logic Allow user to pass `retry_delays` with a custom `ExponentialBackoff` object. Allow user to pass `retry_check` that is a function of the form: `check(s, ex, req, resp) -> Bool` which will be called if the default retry logic doesn't allow retrying. * Update docs
1 parent 16383f2 commit 02ba345

File tree

7 files changed

+79
-45
lines changed

7 files changed

+79
-45
lines changed

src/HTTP.jl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,9 @@ Retry arguments:
135135
- `retry = true`, retry idempotent requests in case of error.
136136
- `retries = 4`, number of times to retry.
137137
- `retry_non_idempotent = false`, retry non-idempotent requests too. e.g. POST.
138+
- `retry_delay = ExponentialBackOff(n = retries)`, provide a custom `ExponentialBackOff` object to control the delay between retries.
139+
- `retry_check = (s, ex, req, resp) -> Bool`, provide a custom function to control whether a retry should be attempted.
140+
The function should accept 4 arguments: the delay state, exception, request, and response, and return `true` if a retry should be attempted.
138141
139142
Redirect arguments:
140143
- `redirect = true`, follow 3xx redirect responses; i.e. additional requests will be made to the redirected location

src/Messages.jl

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ module Messages
5151

5252
export Message, Request, Response,
5353
reset!, status, method, headers, uri, body, resource,
54-
iserror, isredirect, retryable, ischunked, issafe, isidempotent,
54+
iserror, isredirect, retryablebody, retryable, ischunked, issafe, isidempotent,
5555
header, hasheader, headercontains, setheader, defaultheader!, appendheader,
5656
removeheader, mkheaders, readheaders, headerscomplete,
5757
readchunksize,
@@ -124,9 +124,8 @@ function reset!(r::Response)
124124
if !isempty(r.headers)
125125
empty!(r.headers)
126126
end
127-
if r.body isa Vector{UInt8} && !isempty(r.body)
128-
empty!(r.body)
129-
end
127+
delete!(r.request.context, :response_body)
128+
return
130129
end
131130

132131
status(r::Response) = getfield(r, :status)
@@ -224,17 +223,6 @@ https://tools.ietf.org/html/rfc7231#section-4.2.1
224223
issafe(r::Request) = issafe(r.method)
225224
issafe(method) = method in ["GET", "HEAD", "OPTIONS", "TRACE"]
226225

227-
"""
228-
isidempotent(::Request)
229-
230-
https://tools.ietf.org/html/rfc7231#section-4.2.2
231-
"""
232-
isidempotent(r::Request) = isidempotent(r.method)
233-
isidempotent(method) = issafe(method) || method in ["PUT", "DELETE"]
234-
retry_non_idempotent(r::Request) = get(r.context, :retry_non_idempotent, false)
235-
allow_retries(r::Request) = get(r.context, :allow_retries, false)
236-
nothing_written(r::Request) = get(r.context, :nothingwritten, false)
237-
238226
"""
239227
iserror(::Response)
240228
@@ -258,6 +246,17 @@ isredirect(status) = status in (301, 302, 303, 307, 308)
258246
redirectlimitreached(r::Request) = get(r.context, :redirectlimitreached, false)
259247
allow_redirects(r::Request) = get(r.context, :allow_redirects, false)
260248

249+
"""
250+
isidempotent(::Request)
251+
252+
https://tools.ietf.org/html/rfc7231#section-4.2.2
253+
"""
254+
isidempotent(r::Request) = isidempotent(r.method)
255+
isidempotent(method) = issafe(method) || method in ["PUT", "DELETE"]
256+
retry_non_idempotent(r::Request) = get(r.context, :retry_non_idempotent, false)
257+
allow_retries(r::Request) = get(r.context, :allow_retries, false)
258+
nothing_written(r::Request) = get(r.context, :nothingwritten, false)
259+
261260
# whether the retry limit has been reached for a given request
262261
# set in the RetryRequest layer once the limit is reached
263262
retrylimitreached(r::Request) = get(r.context, :retrylimitreached, false)
@@ -272,8 +271,15 @@ function retryable end
272271
supportsmark(x) = false
273272
supportsmark(x::T) where {T <: IO} = length(Base.methods(mark, Tuple{T}, parentmodule(T))) > 0 || hasfield(T, :mark)
274273

275-
retryable(r::Request) = (isbytes(r.body) || r.body isa Union{Dict, NamedTuple} || (r.body isa Vector && all(isbytes, r.body)) ||
276-
(supportsmark(r.body) && ismarked(r.body))) &&
274+
# request body is retryable if it was provided as "bytes", a Dict or NamedTuple,
275+
# or a chunked array of "bytes"; OR if it supports mark() and is marked
276+
retryablebody(r::Request) = (isbytes(r.body) || r.body isa Union{Dict, NamedTuple} ||
277+
(r.body isa Vector && all(isbytes, r.body)) || (supportsmark(r.body) && ismarked(r.body)))
278+
279+
# request is retryable if the body is retryable, the user is allowing retries at all,
280+
# we haven't reached the retry limit, and either nothing has been written yet or
281+
# the request is idempotent or the user has explicitly allowed non-idempotent retries
282+
retryable(r::Request) = retryablebody(r) &&
277283
allow_retries(r) && !retrylimitreached(r) &&
278284
(nothing_written(r) || isidempotent(r) || retry_non_idempotent(r))
279285
retryable(r::Response) = retryable(r.status)

src/clientlayers/ExceptionRequest.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ module ExceptionRequest
22

33
export exceptionlayer
44

5-
using ..Messages, ..Exceptions
5+
using ..IOExtras, ..Messages, ..Exceptions
66

77
"""
88
exceptionlayer(handler) -> handler

src/clientlayers/MessageRequest.jl

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
module MessageRequest
22

33
using URIs
4-
using ..Messages, ..Parsers
4+
using ..IOExtras, ..Messages, ..Parsers
55

66
export messagelayer
77

@@ -14,7 +14,18 @@ Hard-coded as the first layer in the request pipeline.
1414
function messagelayer(handler)
1515
return function(method::String, url::URI, headers::Headers, body; response_stream=nothing, http_version=v"1.1", kw...)
1616
req = Request(method, resource(url), headers, body; url=url, version=http_version, responsebody=response_stream)
17-
return handler(req; response_stream=response_stream, kw...)
17+
local resp
18+
try
19+
resp = handler(req; response_stream=response_stream, kw...)
20+
finally
21+
if @isdefined(resp) && iserror(resp) && haskey(resp.request.context, :response_body)
22+
if isbytes(resp.body)
23+
resp.body = resp.request.context[:response_body]
24+
else
25+
write(resp.body, resp.request.context[:response_body])
26+
end
27+
end
28+
end
1829
end
1930
end
2031

src/clientlayers/RetryRequest.jl

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ using ..IOExtras, ..Messages, ..Strings, ..ExceptionRequest, ..Exceptions
55

66
export retrylayer
77

8+
FALSE(x...) = false
9+
810
"""
911
retrylayer(handler) -> handler
1012
@@ -19,7 +21,9 @@ e.g. `Sockets.DNSError`, `Base.EOFError` and `HTTP.StatusError`
1921
(if status is `5xx`).
2022
"""
2123
function retrylayer(handler)
22-
return function(req::Request; retry::Bool=true, retries::Int=4, retry_non_idempotent::Bool=false, kw...)
24+
return function(req::Request; retry::Bool=true, retries::Int=4,
25+
retry_delays::ExponentialBackOff=ExponentialBackOff(n = retries), retry_check=FALSE,
26+
retry_non_idempotent::Bool=false, kw...)
2327
if !retry || retries == 0
2428
# no retry
2529
return handler(req; kw...)
@@ -37,11 +41,11 @@ function retrylayer(handler)
3741
end
3842
retryattempt = Ref(0)
3943
retry_request = Base.retry(handler,
40-
delays=ExponentialBackOff(n = retries),
44+
delays=retry_delays,
4145
check=(s, ex) -> begin
4246
retryattempt[] += 1
4347
req.context[:retryattempt] = retryattempt[]
44-
retry = isrecoverable(ex) && retryable(req)
48+
retry = retryable(req) || retryablebody(req) && _retry_check(s, ex, req, retry_check)
4549
if retryattempt[] == retries
4650
req.context[:retrylimitreached] = true
4751
end
@@ -63,20 +67,19 @@ function retrylayer(handler)
6367
end
6468
end
6569

66-
isrecoverable(e) = false
67-
isrecoverable(e::Union{Base.EOFError, Base.IOError, MbedTLS.MbedException, OpenSSL.OpenSSLError}) = true
68-
isrecoverable(e::ArgumentError) = e.msg == "stream is closed or unusable"
69-
isrecoverable(e::Sockets.DNSError) = true
70-
isrecoverable(e::ConnectError) = true
71-
isrecoverable(e::RequestError) = isrecoverable(e.error)
72-
isrecoverable(e::StatusError) = retryable(e.status)
70+
function _retry_check(s, ex, req, check)
71+
resp = req.response
72+
if haskey(req.context, :response_body)
73+
resp.body = req.context[:response_body]
74+
end
75+
return check(s, ex, req, resp)
76+
end
7377

7478
function no_retry_reason(ex, req)
7579
buf = IOBuffer()
7680
show(IOContext(buf, :compact => true), req)
7781
print(buf, ", ",
7882
ex isa StatusError ? "HTTP $(ex.status): " :
79-
!isrecoverable(ex) ? "$ex not recoverable, " : "",
8083
!isbytes(req.body) ? "request streamed, " : "",
8184
!isbytes(req.response.body) ? "response streamed, " : "",
8285
!isidempotent(req) ? "$(req.method) non-idempotent" : "")

src/clientlayers/StreamRequest.jl

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -129,16 +129,17 @@ function readbody(stream::Stream, res::Response, decompress::Union{Nothing, Bool
129129
end
130130

131131
function readbody!(stream::Stream, res::Response, buf_or_stream)
132-
if isbytes(res.body)
133-
# normal response body path: read as Vector{UInt8} and store
134-
res.body = read(buf_or_stream)
135-
elseif isredirect(stream) || retryable(stream)
136-
# if response body is a stream, but we're redirecting or
137-
# retrying, store this "temporary" body in the request context
138-
res.request.context[:response_body] = read(buf_or_stream)
132+
if !iserror(res)
133+
if isbytes(res.body)
134+
res.body = read(buf_or_stream)
135+
else
136+
write(res.body, buf_or_stream)
137+
end
139138
else
140-
# normal streaming response body path: write response body out directly
141-
write(res.body, buf_or_stream)
139+
# read the response body into the request context so that it can be
140+
# read by the user if they want to or set later if
141+
# we end up not retrying/redirecting/etc.
142+
res.request.context[:response_body] = read(buf_or_stream)
142143
end
143144
end
144145

test/client.jl

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -496,13 +496,20 @@ end
496496

497497
@testset "Retry with request/response body streams" begin
498498
shouldfail = Ref(true)
499+
status = Ref(200)
499500
server = HTTP.listen!(8080) do http
500501
@assert !eof(http)
501502
msg = String(read(http))
502503
if shouldfail[]
503504
shouldfail[] = false
504505
error("500 unexpected error")
505506
end
507+
HTTP.setstatus(http, status[])
508+
if status[] != 200
509+
HTTP.startwrite(http)
510+
HTTP.write(http, "$(status[]) unexpected error")
511+
status[] = 200
512+
end
506513
HTTP.startwrite(http)
507514
HTTP.write(http, msg)
508515
end
@@ -518,14 +525,17 @@ end
518525
seekstart(req_body)
519526
resp = HTTP.get("http://localhost:8080/retry"; body=req_body, response_stream=res_body, retry=false, status_exception=false)
520527
@test String(take!(res_body)) == "500 unexpected error"
521-
# when retrying, we can still get access to the most recent failed response body in the response's request context
522-
shouldfail[] = true
528+
# don't throw a 500, but set status to status we don't retry by default
529+
shouldfail[] = false
530+
status[] = 404
523531
seekstart(req_body)
524-
println("making 3rd request")
525-
resp = HTTP.get("http://localhost:8080/retry"; body=req_body, response_stream=res_body)
532+
check = (s, ex, req, resp) -> begin
533+
@test String(resp.body) == "404 unexpected error"
534+
resp.status == 404
535+
end
536+
resp = HTTP.get("http://localhost:8080/retry"; body=req_body, response_stream=res_body, retry_check=(s, ex, req, resp) -> resp.status == 404)
526537
@test isok(resp)
527538
@test String(take!(res_body)) == "hey there sailor"
528-
@test String(resp.request.context[:response_body]) == "500 unexpected error"
529539
finally
530540
close(server)
531541
HTTP.ConnectionPool.closeall()

0 commit comments

Comments
 (0)