Skip to content

Commit 3614bef

Browse files
committed
Tunnel: newtunnelconnection
Creates a tunneled connection with a pool key that `aquire` can find on subsequent requests if the connection is kept alive
1 parent 21b9ee6 commit 3614bef

File tree

6 files changed

+143
-77
lines changed

6 files changed

+143
-77
lines changed

src/Connections.jl

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -615,31 +615,6 @@ function sslconnection(::Type{SSLContext}, tcp::TCPSocket, host::AbstractString;
615615
return io
616616
end
617617

618-
function sslupgrade(::Type{IOType}, c::Connection{T},
619-
host::AbstractString;
620-
pool::Union{Nothing, Pool}=nothing,
621-
require_ssl_verification::Bool=NetworkOptions.verify_host(host, "SSL"),
622-
keepalive::Bool=true,
623-
readtimeout::Int=0,
624-
kw...)::Connection{IOType} where {T, IOType}
625-
# initiate the upgrade to SSL
626-
# if the upgrade fails, an error will be thrown and the original c will be closed
627-
# in ConnectionRequest
628-
tls = if readtimeout > 0
629-
try_with_timeout(readtimeout) do _
630-
sslconnection(IOType, c.io, host; require_ssl_verification=require_ssl_verification, keepalive=keepalive, kw...)
631-
end
632-
else
633-
sslconnection(IOType, c.io, host; require_ssl_verification=require_ssl_verification, keepalive=keepalive, kw...)
634-
end
635-
# success, now we turn it into a new Connection
636-
conn = Connection(host, "", 0, require_ssl_verification, keepalive, tls)
637-
# release the "old" one, but don't return the connection since we're hijacking the socket
638-
release(getpool(pool, T), connectionkey(c))
639-
# and return the new one
640-
return acquire(() -> conn, getpool(pool, IOType), connectionkey(conn); forcenew=true)
641-
end
642-
643618
function Base.show(io::IO, c::Connection)
644619
nwaiting = has_tcpsocket(c) ? bytesavailable(tcpsocket(c)) : 0
645620
print(

src/HTTP.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ include("Connections.jl") ;using .Connections
4242
const ConnectionPool = Connections
4343
include("StatusCodes.jl") ;using .StatusCodes
4444
include("Messages.jl") ;using .Messages
45+
include("Tunnel.jl") ;using .Tunnel
4546
include("cookies.jl") ;using .Cookies
4647
include("Streams.jl") ;using .Streams
4748

src/Tunnel.jl

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
module Tunnel
2+
3+
export newtunnelconnection
4+
5+
using Sockets, LoggingExtras, NetworkOptions, URIs
6+
using ConcurrentUtilities: acquire, try_with_timeout
7+
8+
using ..Connections, ..Messages, ..Exceptions
9+
using ..Connections: connection_limit_warning, getpool, getconnection, sslconnection, connectionkey, connection_isvalid
10+
11+
function newtunnelconnection(;
12+
target_type::Type{<:IO},
13+
target_host::AbstractString,
14+
target_port::AbstractString,
15+
proxy_type::Type{<:IO},
16+
proxy_host::AbstractString,
17+
proxy_port::AbstractString,
18+
proxy_auth::AbstractString="",
19+
pool::Union{Nothing, Pool}=nothing,
20+
connection_limit=nothing,
21+
forcenew::Bool=false,
22+
idle_timeout=typemax(Int),
23+
connect_timeout::Int=30,
24+
readtimeout::Int=30,
25+
keepalive::Bool=true,
26+
kw...)
27+
connection_limit_warning(connection_limit)
28+
29+
if isempty(target_port)
30+
target_port = istcptype(target_type) ? "80" : "443"
31+
end
32+
33+
require_ssl_verification = get(kw, :require_ssl_verification, NetworkOptions.verify_host(target_host, "SSL"))
34+
host_key = proxy_host * "/" * target_host
35+
port_key = proxy_port * "/" * target_port
36+
key = (host_key, port_key, require_ssl_verification, keepalive, true)
37+
38+
return acquire(
39+
getpool(pool, target_type),
40+
key;
41+
forcenew=forcenew,
42+
isvalid=c->connection_isvalid(c, Int(idle_timeout))) do
43+
44+
conn = Connection(host_key, port_key, idle_timeout, require_ssl_verification, keepalive,
45+
try_with_timeout0(connect_timeout) do _
46+
getconnection(proxy_type, proxy_host, proxy_port; keepalive, kw...)
47+
end
48+
)
49+
try
50+
try_with_timeout0(readtimeout) do _
51+
connect_tunnel(conn, target_host, target_port, proxy_auth)
52+
end
53+
54+
if !istcptype(target_type)
55+
tls = try_with_timeout0(readtimeout) do _
56+
sslconnection(target_type, conn.io, target_host; keepalive, kw...)
57+
end
58+
59+
# success, now we turn it into a new Connection
60+
conn = Connection(host_key, port_key, idle_timeout, require_ssl_verification, keepalive, tls)
61+
end
62+
63+
@assert connectionkey(conn) === key
64+
65+
conn
66+
catch ex
67+
close(conn)
68+
rethrow()
69+
end
70+
end
71+
end
72+
73+
function connect_tunnel(io, target_host, target_port, proxy_auth)
74+
target = "$(URIs.hoststring(target_host)):$(target_port)"
75+
@debug "📡 CONNECT HTTPS tunnel to $target"
76+
headers = Dict("Host" => target)
77+
if (!isempty(proxy_auth))
78+
headers["Proxy-Authorization"] = proxy_auth
79+
end
80+
request = Request("CONNECT", target, headers)
81+
# @debug "connect_tunnel: writing headers"
82+
writeheaders(io, request)
83+
# @debug "connect_tunnel: reading headers"
84+
readheaders(io, request.response)
85+
# @debug "connect_tunnel: done reading headers"
86+
if request.response.status != 200
87+
throw(StatusError(request.response.status,
88+
request.method, request.target, request.response))
89+
end
90+
end
91+
92+
"""
93+
Wrapper to try_with_timeout that optionally disables the timeout if given a non-positive duration.
94+
"""
95+
function try_with_timeout0(f, timeout, ::Type{T}=Any) where {T}
96+
if timeout > 0
97+
try_with_timeout(f, timeout, T)
98+
else
99+
f(Ref(false)) # `f` may check its argument to see if the timeout was reached.
100+
end
101+
end
102+
103+
istcptype(::Type{TCPSocket}) = true
104+
istcptype(::Type{<:IO}) = false
105+
106+
end # module Tunnel

src/clientlayers/ConnectionRequest.jl

Lines changed: 25 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module ConnectionRequest
33
using URIs, Sockets, Base64, ConcurrentUtilities, ExceptionUnwrapping
44
import MbedTLS
55
import OpenSSL
6-
using ..Messages, ..IOExtras, ..Connections, ..Streams, ..Exceptions
6+
using ..Messages, ..IOExtras, ..Connections, ..Streams, ..Exceptions, ..Tunnel
77
import ..SOCKET_TYPE_TLS
88

99
islocalhost(host::AbstractString) = host == "localhost" || host == "127.0.0.1" || host == "::1" || host == "0000:0000:0000:0000:0000:0000:0000:0001" || host == "0:0:0:0:0:0:0:1"
@@ -79,8 +79,31 @@ function connectionlayer(handler)
7979
IOType = sockettype(url, socket_type, socket_type_tls, get(kw, :sslconfig, nothing))
8080
start_time = time()
8181
try
82-
io = newconnection(IOType, url.host, url.port; readtimeout=readtimeout, connect_timeout=connect_timeout, kw...)
82+
if !isnothing(proxy) && req.url.scheme in ("https", "wss", "ws")
83+
target_IOType = sockettype(target_url, socket_type, socket_type_tls, get(kw, :sslconfig, nothing))
84+
85+
io = newtunnelconnection(;
86+
target_type=target_IOType,
87+
target_host=target_url.host,
88+
target_port=target_url.port,
89+
proxy_type=IOType,
90+
proxy_host=url.host,
91+
proxy_port=url.port,
92+
proxy_auth=header(req, "Proxy-Authorization"),
93+
connect_timeout,
94+
readtimeout,
95+
kw...
96+
)
97+
98+
req.headers = filter(x->x.first != "Proxy-Authorization", req.headers)
99+
else
100+
io = newconnection(IOType, url.host, url.port; readtimeout=readtimeout, connect_timeout=connect_timeout, kw...)
101+
end
83102
catch e
103+
if e isa StatusError
104+
return e.response
105+
end
106+
84107
if logerrors
85108
@error current_exceptions_to_string() type=Symbol("HTTP.ConnectError") method=req.method url=req.url context=req.context logtag=logtag
86109
end
@@ -92,32 +115,6 @@ function connectionlayer(handler)
92115

93116
shouldreuse = !(target_url.scheme in ("ws", "wss")) && !closeimmediately
94117
try
95-
if proxy !== nothing && target_url.scheme in ("https", "wss", "ws")
96-
shouldreuse = false
97-
# tunnel request
98-
if target_url.scheme in ("https", "wss")
99-
target_url = URI(target_url, port=443)
100-
elseif target_url.scheme in ("ws", ) && target_url.port == ""
101-
target_url = URI(target_url, port=80) # if there is no port info, connect_tunnel will fail
102-
end
103-
r = if readtimeout > 0
104-
try_with_timeout(readtimeout) do _
105-
connect_tunnel(io, target_url, req)
106-
end
107-
else
108-
connect_tunnel(io, target_url, req)
109-
end
110-
if r.status != 200
111-
close(io)
112-
return r
113-
end
114-
if target_url.scheme in ("https", "wss")
115-
InnerIOType = sockettype(target_url, socket_type, socket_type_tls, get(kw, :sslconfig, nothing))
116-
io = Connections.sslupgrade(InnerIOType, io, target_url.host; readtimeout=readtimeout, kw...)
117-
end
118-
req.headers = filter(x->x.first != "Proxy-Authorization", req.headers)
119-
end
120-
121118
stream = Stream(req.response, io)
122119
return handler(stream; readtimeout=readtimeout, logerrors=logerrors, logtag=logtag, kw...)
123120
catch e
@@ -208,20 +205,4 @@ function tls_socket_type(socket_type_tls::Union{Nothing, Type},
208205
end
209206
end
210207

211-
function connect_tunnel(io, target_url, req)
212-
target = "$(URIs.hoststring(target_url.host)):$(target_url.port)"
213-
@debug "📡 CONNECT HTTPS tunnel to $target"
214-
headers = Dict("Host" => target)
215-
if (auth = header(req, "Proxy-Authorization"); !isempty(auth))
216-
headers["Proxy-Authorization"] = auth
217-
end
218-
request = Request("CONNECT", target, headers)
219-
# @debug "connect_tunnel: writing headers"
220-
writeheaders(io, request)
221-
# @debug "connect_tunnel: reading headers"
222-
readheaders(io, request.response)
223-
# @debug "connect_tunnel: done reading headers"
224-
return request.response
225-
end
226-
227208
end # module ConnectionRequest

test/client.jl

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,21 +42,24 @@ end
4242
# returning 200 each time.
4343
proxy = listen(IPv4(0), 8082)
4444
try
45-
@async begin
45+
proxytask = @async begin
4646
sock = accept(proxy)
4747
while isopen(sock)
4848
line = readline(sock)
4949
@show 1, line
5050
isempty(line) && break
5151
end
5252
write(sock, "HTTP/1.1 200\r\n\r\n")
53-
# Test that we receive something that looks like a client hello
54-
# (indicating that we tried to upgrade the connection to TLS)
55-
line = readline(sock)
56-
@test startswith(line, "\x16")
53+
flush(sock)
54+
readline(sock)
5755
end
5856

59-
@test_throws HTTP.RequestError HTTP.head("https://$httpbin.com"; proxy="http://localhost:8082", readtimeout=1, retry=false)
57+
@test_throws HTTP.ConnectError HTTP.head("https://$httpbin.com"; proxy="http://localhost:8082", readtimeout=1, retry=false)
58+
59+
# Test that we receive something that looks like a client hello
60+
# (indicating that we tried to upgrade the connection to TLS)
61+
line = fetch(proxytask)
62+
@test startswith(line, "\x16")
6063
finally
6164
close(proxy)
6265
HTTP.Connections.closeall()

test/pool.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,14 +157,14 @@ end
157157
@testset "Only one tunnel should be established with sequential requests" begin
158158
https_request_ip_through_proxy()
159159
https_request_ip_through_proxy()
160-
@test_broken connectcount == 1
160+
@test connectcount == 1
161161
end
162162

163163
@testset "parallell tunnels should be established with parallell requests" begin
164164
n_asyncgetters = 3
165165
asyncgetters = [@async https_request_ip_through_proxy() for _ in 1:n_asyncgetters]
166166
wait.(asyncgetters)
167-
@test_broken connectcount == n_asyncgetters
167+
@test connectcount == n_asyncgetters
168168
end
169169

170170
finally

0 commit comments

Comments
 (0)