Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/ConnectionPool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,15 @@ function newconnection(::Type{T},
host::AbstractString,
port::AbstractString;
connection_limit=default_connection_limit[],
forcenew::Bool=false,
idle_timeout=typemax(Int),
require_ssl_verification::Bool=NetworkOptions.verify_host(host, "SSL"),
kw...)::Connection where {T <: IO}
return acquire(
POOL,
(T, host, port, require_ssl_verification, true);
max_concurrent_connections=Int(connection_limit),
forcenew=forcenew,
idle_timeout=Int(idle_timeout)) do
Connection(host, port,
idle_timeout, require_ssl_verification,
Expand Down
7 changes: 6 additions & 1 deletion src/clientlayers/MessageRequest.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module MessageRequest

using URIs
using ..IOExtras, ..Messages, ..Parsers
using ..IOExtras, ..Messages, ..Parsers, ..Exceptions

export messagelayer

Expand All @@ -17,6 +17,11 @@ function messagelayer(handler)
local resp
try
resp = handler(req; response_stream=response_stream, kw...)
catch e
if e isa StatusError
resp = e.response
end
rethrow(e)
finally
if @isdefined(resp) && iserror(resp) && haskey(resp.request.context, :response_body)
if isbytes(resp.body)
Expand Down
10 changes: 5 additions & 5 deletions src/connectionpools.jl
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ call the provided function `f()`, which must return a new connection instance of
This new connection instance will be tracked by the `Pod` and MUST be returned to the `Pod`
after use by calling `release(pod, conn)`.
"""
function acquire(f, pod::Pod)
function acquire(f, pod::Pod, forcenew::Bool=false)
lock(pod.lock)
try
# if there are idle connections in the pod,
# let's check if they're still valid and can be used again
while !isempty(pod.conns)
while !forcenew && !isempty(pod.conns)
# Pod connections are FIFO, so grab the earliest returned connection
# println("$(taskid()): checking idle_timeout connections for reuse")
conn = popfirst!(pod.conns)
Expand Down Expand Up @@ -126,7 +126,7 @@ function acquire(f, pod::Pod)
# is notified
# println("$(taskid()): connection pool maxxed out; waiting for connection to be released to the pod")
conn = wait(pod.lock)
if conn !== nothing
if !forcenew && conn !== nothing
# println("$(taskid()): checking recently released connection validity for reuse")
if isvalid(pod, conn)
return trackconnection!(pod, conn)
Expand Down Expand Up @@ -242,11 +242,11 @@ created and passed the `max`, `idle_timeout`, and `reuse` keyword arguments if p
The provided function `f` must create a new connection instance of type `C`.
The acquired connection MUST be returned to the pool by calling `release(pool, key, conn)` exactly once.
"""
function acquire(f, pool::Pool{C}, key; kw...) where {C}
function acquire(f, pool::Pool{C}, key; forcenew::Bool=false, kw...) where {C}
pod = lock(pool.lock) do
get!(() -> Pod(C; kw...), pool.pods, key)
end
return acquire(f, pod)
return acquire(f, pod, forcenew)
end

function acquire(pool::Pool{C}, key, conn::C; kw...) where {C}
Expand Down
21 changes: 18 additions & 3 deletions test/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -525,15 +525,30 @@ end
seekstart(req_body)
resp = HTTP.get("http://localhost:8080/retry"; body=req_body, response_stream=res_body, retry=false, status_exception=false)
@test String(take!(res_body)) == "500 unexpected error"
# even if status_exception=true, we should still get the right response body
shouldfail[] = true
seekstart(req_body)
try
resp = HTTP.get("http://localhost:8080/retry"; body=req_body, response_stream=res_body, retry=false, forcenew=true)
catch e
@test e isa HTTP.StatusError
@test e.status == 500
@test String(take!(res_body)) == "500 unexpected error"
end
# don't throw a 500, but set status to status we don't retry by default
shouldfail[] = false
status[] = 404
seekstart(req_body)
check = (s, ex, req, resp) -> begin
@test String(resp.body) == "404 unexpected error"
resp.status == 404
str = String(resp.body)
if str != "404 unexpected error" || resp.status != 404
@error "unexpected response body" str
return false
end
@test str == "404 unexpected error"
return resp.status == 404
end
resp = HTTP.get("http://localhost:8080/retry"; body=req_body, response_stream=res_body, retry_check=(s, ex, req, resp) -> resp.status == 404)
resp = HTTP.get("http://localhost:8080/retry"; body=req_body, response_stream=res_body, retry_check=check)
@test isok(resp)
@test String(take!(res_body)) == "hey there sailor"
finally
Expand Down