From 7363c924cf213d55fb082e36ebe62fe70fee11ca Mon Sep 17 00:00:00 2001 From: Jacob Quinn Date: Tue, 13 Dec 2022 13:50:13 -0700 Subject: [PATCH 1/3] Fix setting of response body for StatusError exception As reported in https://github.com/JuliaCloud/AWS.jl/issues/593, the recently refactored retry logic was failing to properly write the response body when a `response_stream` was used and a `StatusError` was thrown. --- src/clientlayers/MessageRequest.jl | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/clientlayers/MessageRequest.jl b/src/clientlayers/MessageRequest.jl index 47f34881b..8923a2692 100644 --- a/src/clientlayers/MessageRequest.jl +++ b/src/clientlayers/MessageRequest.jl @@ -1,7 +1,7 @@ module MessageRequest using URIs -using ..IOExtras, ..Messages, ..Parsers +using ..IOExtras, ..Messages, ..Parsers, ..Exceptions export messagelayer @@ -17,6 +17,12 @@ function messagelayer(handler) local resp try resp = handler(req; response_stream=response_stream, kw...) + catch e + if e isa StatusError + resp = e.response + else + rethrow(e) + end finally if @isdefined(resp) && iserror(resp) && haskey(resp.request.context, :response_body) if isbytes(resp.body) From dca6572db35e2038577f3c31a6929018406a4158 Mon Sep 17 00:00:00 2001 From: Jacob Quinn Date: Tue, 13 Dec 2022 14:00:56 -0700 Subject: [PATCH 2/3] Fix error still being thrown --- src/clientlayers/MessageRequest.jl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/clientlayers/MessageRequest.jl b/src/clientlayers/MessageRequest.jl index 8923a2692..cce1bcb98 100644 --- a/src/clientlayers/MessageRequest.jl +++ b/src/clientlayers/MessageRequest.jl @@ -20,9 +20,8 @@ function messagelayer(handler) catch e if e isa StatusError resp = e.response - else - rethrow(e) end + rethrow(e) finally if @isdefined(resp) && iserror(resp) && haskey(resp.request.context, :response_body) if isbytes(resp.body) From 3dbb25fc1bbf5e4462b62d725d4d03af2e4dea1e Mon Sep 17 00:00:00 2001 From: Jacob Quinn Date: Tue, 13 Dec 2022 23:22:43 -0700 Subject: [PATCH 3/3] Add test and forcenew option to avoid socket closing race condition --- src/ConnectionPool.jl | 2 ++ src/connectionpools.jl | 10 +++++----- test/client.jl | 21 ++++++++++++++++++--- 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/src/ConnectionPool.jl b/src/ConnectionPool.jl index a5abd6470..8b113bdcc 100644 --- a/src/ConnectionPool.jl +++ b/src/ConnectionPool.jl @@ -351,6 +351,7 @@ function newconnection(::Type{T}, host::AbstractString, port::AbstractString; connection_limit=default_connection_limit[], + forcenew::Bool=false, idle_timeout=typemax(Int), require_ssl_verification::Bool=NetworkOptions.verify_host(host, "SSL"), kw...)::Connection where {T <: IO} @@ -358,6 +359,7 @@ function newconnection(::Type{T}, POOL, (T, host, port, require_ssl_verification, true); max_concurrent_connections=Int(connection_limit), + forcenew=forcenew, idle_timeout=Int(idle_timeout)) do Connection(host, port, idle_timeout, require_ssl_verification, diff --git a/src/connectionpools.jl b/src/connectionpools.jl index 0673a03fa..e81cc0410 100644 --- a/src/connectionpools.jl +++ b/src/connectionpools.jl @@ -93,12 +93,12 @@ call the provided function `f()`, which must return a new connection instance of This new connection instance will be tracked by the `Pod` and MUST be returned to the `Pod` after use by calling `release(pod, conn)`. """ -function acquire(f, pod::Pod) +function acquire(f, pod::Pod, forcenew::Bool=false) lock(pod.lock) try # if there are idle connections in the pod, # let's check if they're still valid and can be used again - while !isempty(pod.conns) + while !forcenew && !isempty(pod.conns) # Pod connections are FIFO, so grab the earliest returned connection # println("$(taskid()): checking idle_timeout connections for reuse") conn = popfirst!(pod.conns) @@ -126,7 +126,7 @@ function acquire(f, pod::Pod) # is notified # println("$(taskid()): connection pool maxxed out; waiting for connection to be released to the pod") conn = wait(pod.lock) - if conn !== nothing + if !forcenew && conn !== nothing # println("$(taskid()): checking recently released connection validity for reuse") if isvalid(pod, conn) return trackconnection!(pod, conn) @@ -242,11 +242,11 @@ created and passed the `max`, `idle_timeout`, and `reuse` keyword arguments if p The provided function `f` must create a new connection instance of type `C`. The acquired connection MUST be returned to the pool by calling `release(pool, key, conn)` exactly once. """ -function acquire(f, pool::Pool{C}, key; kw...) where {C} +function acquire(f, pool::Pool{C}, key; forcenew::Bool=false, kw...) where {C} pod = lock(pool.lock) do get!(() -> Pod(C; kw...), pool.pods, key) end - return acquire(f, pod) + return acquire(f, pod, forcenew) end function acquire(pool::Pool{C}, key, conn::C; kw...) where {C} diff --git a/test/client.jl b/test/client.jl index 3741fca7e..50fd20dd3 100644 --- a/test/client.jl +++ b/test/client.jl @@ -525,15 +525,30 @@ end seekstart(req_body) resp = HTTP.get("http://localhost:8080/retry"; body=req_body, response_stream=res_body, retry=false, status_exception=false) @test String(take!(res_body)) == "500 unexpected error" + # even if status_exception=true, we should still get the right response body + shouldfail[] = true + seekstart(req_body) + try + resp = HTTP.get("http://localhost:8080/retry"; body=req_body, response_stream=res_body, retry=false, forcenew=true) + catch e + @test e isa HTTP.StatusError + @test e.status == 500 + @test String(take!(res_body)) == "500 unexpected error" + end # don't throw a 500, but set status to status we don't retry by default shouldfail[] = false status[] = 404 seekstart(req_body) check = (s, ex, req, resp) -> begin - @test String(resp.body) == "404 unexpected error" - resp.status == 404 + str = String(resp.body) + if str != "404 unexpected error" || resp.status != 404 + @error "unexpected response body" str + return false + end + @test str == "404 unexpected error" + return resp.status == 404 end - resp = HTTP.get("http://localhost:8080/retry"; body=req_body, response_stream=res_body, retry_check=(s, ex, req, resp) -> resp.status == 404) + resp = HTTP.get("http://localhost:8080/retry"; body=req_body, response_stream=res_body, retry_check=check) @test isok(resp) @test String(take!(res_body)) == "hey there sailor" finally