From e0f7d4d76585c7b41ad720b9dd0769ce4b9b3ad7 Mon Sep 17 00:00:00 2001 From: lif <> Date: Wed, 27 Mar 2024 05:15:54 +0000 Subject: [PATCH 1/8] wip - vnc proxy - first pass at plumbing tcp stream thru websocket --- nexus/src/app/instance.rs | 209 ++++++++++++++++++++- nexus/src/external_api/http_entrypoints.rs | 48 +++++ 2 files changed, 255 insertions(+), 2 deletions(-) diff --git a/nexus/src/app/instance.rs b/nexus/src/app/instance.rs index e29ed21192c..8eec498e494 100644 --- a/nexus/src/app/instance.rs +++ b/nexus/src/app/instance.rs @@ -58,6 +58,7 @@ use sled_agent_client::types::InstancePutStateBody; use std::matches; use std::net::SocketAddr; use std::sync::Arc; +use tokio::io::Interest; use tokio::io::{AsyncRead, AsyncWrite}; use uuid::Uuid; @@ -1710,7 +1711,7 @@ impl super::Nexus { instance_lookup: &lookup::Instance<'_>, params: ¶ms::InstanceSerialConsoleStreamRequest, ) -> Result<(), Error> { - let client_addr = match self + let propolis_addr = match self .propolis_addr_for_instance( opctx, instance_lookup, @@ -1737,7 +1738,7 @@ impl super::Nexus { }; match propolis_client::support::InstanceSerialConsoleHelper::new( - client_addr, + propolis_addr, offset, Some(self.log.clone()), ) @@ -1765,6 +1766,56 @@ impl super::Nexus { } } + pub(crate) async fn instance_vnc_stream( + &self, + opctx: &OpContext, + mut client_stream: WebSocketStream, + instance_lookup: &lookup::Instance<'_>, + ) -> Result<(), Error> { + const VNC_PORT: u16 = 5900; + + let propolis_ip = match self + .propolis_addr_for_instance( + opctx, + instance_lookup, + authz::Action::Modify, + ) + .await + { + Ok(x) => x.ip(), + Err(e) => { + let _ = client_stream + .close(Some(CloseFrame { + code: CloseCode::Error, + reason: e.to_string().into(), + })) + .await + .is_ok(); + return Err(e); + } + }; + + match tokio::net::TcpStream::connect((propolis_ip, VNC_PORT)).await { + Ok(propolis_conn) => { + Self::wrap_tcp_socket_ws(client_stream, propolis_conn) + .await + .map_err(|e| Error::internal_error(&format!("{}", e))) + } + Err(e) => { + let message = + format!("socket connection to instance VNC failed: {}", e); + let _ = client_stream + .close(Some(CloseFrame { + code: CloseCode::Error, + reason: message.clone().into(), + })) + .await + .is_ok(); + Err(Error::internal_error(&message)) + } + } + } + async fn propolis_addr_for_instance( &self, opctx: &OpContext, @@ -1976,6 +2027,160 @@ impl super::Nexus { Ok(()) } + /// Trivially pack data read from a TcpStream into binary websocket frames, + /// and unpack those received from the client accordingly. + /// NoVNC (a web VNC client) calls their version of this "websockify". + async fn wrap_tcp_socket_ws( + client_stream: WebSocketStream, + propolis_conn: tokio::net::TcpStream, + ) -> Result<(), propolis_client::support::tungstenite::Error> { + let (mut nexus_sink, mut nexus_stream) = client_stream.split(); + + // buffered_input is Some if there's a websocket message waiting to be + // sent from the client to propolis. + let mut buffered_input = None; + // buffered_output is Some if there's data waiting to be sent from + // propolis to the client. + let mut buffered_output = None; + + loop { + let nexus_read; + let nexus_reserve; + let propolis_readiness; + + // TODO: come up with a less arbitrary size + let mut propolis_read_buffer = vec![0u8; 65536]; + + if buffered_input.is_some() { + // We already have a buffered input -- do not read any further + // messages from the client. + nexus_read = Fuse::terminated(); + if buffered_output.is_some() { + // We already have a buffered output -- do not read any + // further messages from propolis. + nexus_reserve = nexus_sink.reserve().fuse(); + propolis_readiness = + propolis_conn.ready(Interest::WRITABLE).fuse(); + } else { + nexus_reserve = Fuse::terminated(); + // We have both a buffered input to send propolis, and an + // empty output buffer to receive from it. + propolis_readiness = propolis_conn + .ready(Interest::READABLE | Interest::WRITABLE) + .fuse(); + } + } else { + nexus_read = nexus_stream.next().fuse(); + if buffered_output.is_some() { + // We already have a buffered output -- do not read any + // further messages from propolis. + nexus_reserve = nexus_sink.reserve().fuse(); + propolis_readiness = Fuse::terminated(); + } else { + nexus_reserve = Fuse::terminated(); + propolis_readiness = + propolis_conn.ready(Interest::READABLE).fuse(); + } + } + + tokio::select! { + msg = nexus_read => { + match msg { + None => { + // websocket connection to nexus client closed unexpectedly + break; + } + Some(Err(e)) => { + // error in websocket connection to nexus client + return Err(e); + } + Some(Ok(WebSocketMessage::Close(_details))) => { + // websocket connection to nexus client closed normally + break; + } + Some(Ok(WebSocketMessage::Text(_text))) => { + // TODO: json payloads specifying client-sent metadata? + } + Some(Ok(WebSocketMessage::Binary(data))) => { + debug_assert!( + buffered_input.is_none(), + "attempted to drop buffered_input message ({buffered_input:?})", + ); + buffered_input = Some(data); + } + // Frame won't exist at this level, and ping reply is handled by tungstenite + Some(Ok(WebSocketMessage::Frame(_) | WebSocketMessage::Ping(_) | WebSocketMessage::Pong(_))) => {} + } + } + result = nexus_reserve => { + let permit = result?; + let message = buffered_output + .take() + .expect("nexus_reserve is only active when buffered_output is Some"); + permit.send(message)?.await?; + } + result = propolis_readiness => { + let ready = result?; + if ready.is_readable() { + match propolis_conn.try_read(&mut propolis_read_buffer) { + Ok(num_bytes) => { + let prev = buffered_output.replace(WebSocketMessage::Binary( + Vec::from(&propolis_read_buffer[..num_bytes]) + )); + if prev.is_some() { + panic!("propolis_readiness is only readable when buffered_output is None"); + } + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + // readiness event was a false-positive. ignore + continue; + } + Err(e) => { + nexus_sink.send(WebSocketMessage::Close(Some(CloseFrame { + code: CloseCode::Abnormal, + reason: std::borrow::Cow::from( + "nexus: TCP connection to VNC closed unexpectedly while reading" + ), + }))).await?; + return Err(e.into()); + } + } + } + if ready.is_writable() { + let data = buffered_input + .take() + .expect("propolis_readiness is only writable when buffered_input is Some"); + + match propolis_conn.try_write(&data) { + Ok(num_bytes) => { + if num_bytes == data.len() { + buffered_input = None; + } else { + buffered_input = Some(Vec::from(&data[num_bytes..])); + } + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + // readiness event was a false-positive. ignore + continue; + } + Err(e) => { + nexus_sink.send(WebSocketMessage::Close(Some(CloseFrame { + code: CloseCode::Abnormal, + reason: std::borrow::Cow::from( + "nexus: TCP connection to VNC closed unexpectedly while writing" + ), + }))).await?; + return Err(e.into()); + } + } + } + } + } + } + + Ok(()) + } + /// Attach an ephemeral IP to an instance. pub(crate) async fn instance_attach_ephemeral_ip( self: &Arc, diff --git a/nexus/src/external_api/http_entrypoints.rs b/nexus/src/external_api/http_entrypoints.rs index 551ef00817e..c423a65d367 100644 --- a/nexus/src/external_api/http_entrypoints.rs +++ b/nexus/src/external_api/http_entrypoints.rs @@ -175,6 +175,7 @@ pub(crate) fn external_api() -> NexusApiDescription { api.register(instance_disk_detach)?; api.register(instance_serial_console)?; api.register(instance_serial_console_stream)?; + api.register(instance_vnc)?; api.register(instance_ssh_public_key_list)?; api.register(image_list)?; @@ -2787,6 +2788,53 @@ async fn instance_serial_console_stream( } } +/// Stream instance VNC framebuffer +#[channel { + protocol = WEBSOCKETS, + path = "/v1/instances/{instance}/vnc", + tags = ["instances"], +}] +async fn instance_vnc( + rqctx: RequestContext>, + path_params: Path, + query_params: Query, + conn: WebsocketConnection, +) -> WebsocketChannelResult { + let apictx = rqctx.context(); + let nexus = &apictx.nexus; + let path = path_params.into_inner(); + let query = query_params.into_inner(); + let opctx = crate::context::op_context_for_external_api(&rqctx).await?; + let instance_selector = params::InstanceSelector { + project: query.project.clone(), + instance: path.instance, + }; + let mut client_stream = WebSocketStream::from_raw_socket( + conn.into_inner(), + WebSocketRole::Server, + None, + ) + .await; + match nexus.instance_lookup(&opctx, instance_selector) { + Ok(instance_lookup) => { + nexus + .instance_vnc_stream(&opctx, client_stream, &instance_lookup) + .await?; + Ok(()) + } + Err(e) => { + let _ = client_stream + .close(Some(CloseFrame { + code: CloseCode::Error, + reason: e.to_string().into(), + })) + .await + .is_ok(); + Err(e.into()) + } + } +} + /// List SSH public keys for instance /// /// List SSH public keys injected via cloud-init during instance creation. Note From 67ab4bff8b662704ccc260459096678477277043 Mon Sep 17 00:00:00 2001 From: lif <> Date: Wed, 27 Mar 2024 06:16:28 +0000 Subject: [PATCH 2/8] wip - vnc proxy - openapi update --- openapi/nexus.json | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/openapi/nexus.json b/openapi/nexus.json index 7d236de7a33..6813b5e54e9 100644 --- a/openapi/nexus.json +++ b/openapi/nexus.json @@ -2676,6 +2676,45 @@ } } }, + "/v1/instances/{instance}/vnc": { + "get": { + "tags": [ + "instances" + ], + "summary": "Stream instance VNC framebuffer", + "operationId": "instance_vnc", + "parameters": [ + { + "in": "path", + "name": "instance", + "description": "Name or ID of the instance", + "required": true, + "schema": { + "$ref": "#/components/schemas/NameOrId" + } + }, + { + "in": "query", + "name": "project", + "description": "Name or ID of the project", + "schema": { + "$ref": "#/components/schemas/NameOrId" + } + } + ], + "responses": { + "default": { + "description": "", + "content": { + "*/*": { + "schema": {} + } + } + } + }, + "x-dropshot-websocket": {} + } + }, "/v1/ip-pools": { "get": { "tags": [ From 661fab53342987b0f923803a5315a9a651513613 Mon Sep 17 00:00:00 2001 From: lif <> Date: Wed, 27 Mar 2024 07:16:51 +0000 Subject: [PATCH 3/8] wip - vnc proxy - tags & error reword --- nexus/src/app/instance.rs | 3 +-- nexus/tests/output/nexus_tags.txt | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/nexus/src/app/instance.rs b/nexus/src/app/instance.rs index 8eec498e494..10e34b2a7cc 100644 --- a/nexus/src/app/instance.rs +++ b/nexus/src/app/instance.rs @@ -1854,8 +1854,7 @@ impl super::Nexus { } } else { Err(Error::invalid_request(format!( - "instance is {} and has no active serial console \ - server", + "instance is {} and has no active console server", instance.runtime().nexus_state ))) } diff --git a/nexus/tests/output/nexus_tags.txt b/nexus/tests/output/nexus_tags.txt index 64413f396ea..7caf12a2693 100644 --- a/nexus/tests/output/nexus_tags.txt +++ b/nexus/tests/output/nexus_tags.txt @@ -64,6 +64,7 @@ instance_ssh_public_key_list GET /v1/instances/{instance}/ssh-p instance_start POST /v1/instances/{instance}/start instance_stop POST /v1/instances/{instance}/stop instance_view GET /v1/instances/{instance} +instance_vnc GET /v1/instances/{instance}/vnc API operations found with tag "login" OPERATION ID METHOD URL PATH From 9ce1941b06e0e0623a77123418dbd9d8e4ae0687 Mon Sep 17 00:00:00 2001 From: lif <> Date: Wed, 27 Mar 2024 07:28:01 +0000 Subject: [PATCH 4/8] wip - vnc proxy - not sure why cargo builds are doing this but okay --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index f7b9f27b1f3..e594a1b6e69 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4062,7 +4062,7 @@ dependencies = [ "colored", "dlpi", "libc", - "num_enum 0.5.11", + "num_enum", "nvpair", "nvpair-sys", "rusty-doors", From 6824676be39f5b3ea09d7d472780a0e589cac297 Mon Sep 17 00:00:00 2001 From: lif <> Date: Wed, 27 Mar 2024 10:12:23 +0000 Subject: [PATCH 5/8] wip - vnc proxy - unit test --- nexus/src/app/instance.rs | 90 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) diff --git a/nexus/src/app/instance.rs b/nexus/src/app/instance.rs index 10e34b2a7cc..11480b57c92 100644 --- a/nexus/src/app/instance.rs +++ b/nexus/src/app/instance.rs @@ -2303,6 +2303,7 @@ mod tests { InstanceSerialConsoleHelper, WSClientOffset, }; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; #[tokio::test] async fn test_serial_console_stream_proxying() { @@ -2395,4 +2396,93 @@ mod tests { .expect("proxy task exited successfully"); logctx.cleanup_successful(); } + + #[tokio::test] + async fn test_tcp_stream_proxying() { + let logctx = test_setup_log("test_tcp_stream_proxying"); + let (nexus_client_conn, nexus_server_conn) = tokio::io::duplex(1024); + let propolis_listener = tokio::net::TcpListener::bind("127.0.0.1:0") + .await + .expect("couldn't make TcpListener"); + + let addr = propolis_listener.local_addr().unwrap(); + + let jh = tokio::spawn(async move { propolis_listener.accept().await }); + + let propolis_client_conn = tokio::net::TcpStream::connect(addr) + .await + .expect("couldn't open TcpStream connection to TcpListener"); + + let mut propolis_server_conn = jh + .await + .expect("couldn't join") + .expect("couldn't accept client connection from TcpListener") + .0; + + let jh = tokio::spawn(async move { + let nexus_client_stream = WebSocketStream::from_raw_socket( + nexus_server_conn, + Role::Server, + None, + ) + .await; + Nexus::wrap_tcp_socket_ws(nexus_client_stream, propolis_client_conn) + .await + }); + let mut nexus_client_ws = WebSocketStream::from_raw_socket( + nexus_client_conn, + Role::Client, + None, + ) + .await; + + slog::info!(logctx.log, "sending messages to nexus client"); + let sent1 = WebSocketMessage::Binary(vec![1, 2, 3, 42, 5]); + nexus_client_ws.send(sent1.clone()).await.unwrap(); + let sent2 = WebSocketMessage::Binary(vec![5, 42, 3, 2, 1]); + nexus_client_ws.send(sent2.clone()).await.unwrap(); + slog::info!( + logctx.log, + "messages sent, receiving them via propolis server" + ); + let received = + tokio::time::timeout(std::time::Duration::from_secs(10), async { + let mut buf = [0u8; 1024]; + let mut received = Vec::::new(); + while received.len() < 10 { + let bytes = + propolis_server_conn.read(&mut buf).await.unwrap(); + received.extend(&buf[..bytes]); + } + received + }) + .await + .expect("timed out receiving"); + assert_eq!(received, vec![1, 2, 3, 42, 5, 5, 42, 3, 2, 1]); + + slog::info!(logctx.log, "sending data to propolis server"); + let sent3 = vec![6, 7, 8, 90, 90, 8, 7, 6]; + propolis_server_conn.write_all(&sent3).await.unwrap(); + slog::info!(logctx.log, "data sent, receiving it via nexus client"); + let received3 = nexus_client_ws.next().await.unwrap().unwrap(); + assert_eq!(WebSocketMessage::Binary(sent3), received3); + + slog::info!(logctx.log, "sending close message to nexus client"); + let sent = WebSocketMessage::Close(Some(CloseFrame { + code: CloseCode::Normal, + reason: std::borrow::Cow::from("test done"), + })); + nexus_client_ws.send(sent.clone()).await.unwrap(); + slog::info!( + logctx.log, + "sent close message, waiting \ + 1s for proxy task to shut down" + ); + tokio::time::timeout(Duration::from_secs(1), jh) + .await + .expect("proxy task shut down within 1s") + .expect("task successfully completed") + .expect("proxy task exited successfully"); + logctx.cleanup_successful(); + } } From aa12d4bb4b341b548ee5ef7dbcebf12630716b4b Mon Sep 17 00:00:00 2001 From: lif <> Date: Wed, 27 Mar 2024 18:22:31 +0000 Subject: [PATCH 6/8] wip - vnc proxy - less ridiculous --- nexus/src/app/instance.rs | 187 +++++++++++++------------------------- 1 file changed, 65 insertions(+), 122 deletions(-) diff --git a/nexus/src/app/instance.rs b/nexus/src/app/instance.rs index 11480b57c92..12a54ab8888 100644 --- a/nexus/src/app/instance.rs +++ b/nexus/src/app/instance.rs @@ -58,8 +58,7 @@ use sled_agent_client::types::InstancePutStateBody; use std::matches; use std::net::SocketAddr; use std::sync::Arc; -use tokio::io::Interest; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use uuid::Uuid; type SledAgentClientError = @@ -1769,7 +1768,9 @@ impl super::Nexus { pub(crate) async fn instance_vnc_stream( &self, opctx: &OpContext, - mut client_stream: WebSocketStream, + mut client_stream: WebSocketStream< + impl AsyncRead + AsyncWrite + Unpin + Send + 'static, + >, instance_lookup: &lookup::Instance<'_>, ) -> Result<(), Error> { const VNC_PORT: u16 = 5900; @@ -1797,7 +1798,7 @@ impl super::Nexus { match tokio::net::TcpStream::connect((propolis_ip, VNC_PORT)).await { Ok(propolis_conn) => { - Self::wrap_tcp_socket_ws(client_stream, propolis_conn) + Self::proxy_tcp_socket_ws(client_stream, propolis_conn) .await .map_err(|e| Error::internal_error(&format!("{}", e))) } @@ -2029,61 +2030,45 @@ impl super::Nexus { /// Trivially pack data read from a TcpStream into binary websocket frames, /// and unpack those received from the client accordingly. /// NoVNC (a web VNC client) calls their version of this "websockify". - async fn wrap_tcp_socket_ws( - client_stream: WebSocketStream, + async fn proxy_tcp_socket_ws( + client_stream: WebSocketStream< + impl AsyncRead + AsyncWrite + Unpin + Send + 'static, + >, propolis_conn: tokio::net::TcpStream, ) -> Result<(), propolis_client::support::tungstenite::Error> { let (mut nexus_sink, mut nexus_stream) = client_stream.split(); - - // buffered_input is Some if there's a websocket message waiting to be - // sent from the client to propolis. - let mut buffered_input = None; - // buffered_output is Some if there's data waiting to be sent from - // propolis to the client. - let mut buffered_output = None; - - loop { - let nexus_read; - let nexus_reserve; - let propolis_readiness; - - // TODO: come up with a less arbitrary size - let mut propolis_read_buffer = vec![0u8; 65536]; - - if buffered_input.is_some() { - // We already have a buffered input -- do not read any further - // messages from the client. - nexus_read = Fuse::terminated(); - if buffered_output.is_some() { - // We already have a buffered output -- do not read any - // further messages from propolis. - nexus_reserve = nexus_sink.reserve().fuse(); - propolis_readiness = - propolis_conn.ready(Interest::WRITABLE).fuse(); - } else { - nexus_reserve = Fuse::terminated(); - // We have both a buffered input to send propolis, and an - // empty output buffer to receive from it. - propolis_readiness = propolis_conn - .ready(Interest::READABLE | Interest::WRITABLE) - .fuse(); - } - } else { - nexus_read = nexus_stream.next().fuse(); - if buffered_output.is_some() { - // We already have a buffered output -- do not read any - // further messages from propolis. - nexus_reserve = nexus_sink.reserve().fuse(); - propolis_readiness = Fuse::terminated(); - } else { - nexus_reserve = Fuse::terminated(); - propolis_readiness = - propolis_conn.ready(Interest::READABLE).fuse(); + let (mut propolis_reader, mut propolis_writer) = + propolis_conn.into_split(); + let (closed_tx, mut closed_rx) = tokio::sync::oneshot::channel::<()>(); + + let mut jh = tokio::spawn(async move { + // medium-sized websocket binary frame + let mut read_buffer = vec![0u8; 65536]; + loop { + tokio::select! { + _ = &mut closed_rx => break, + num_bytes_res = propolis_reader.read(&mut read_buffer) => { + let Ok(num_bytes) = num_bytes_res else { + let _ = nexus_sink.send(WebSocketMessage::Close(None)).await.is_ok(); + break; + }; + let data = Vec::from(&read_buffer[..num_bytes]); + match nexus_sink.send(WebSocketMessage::Binary(data)).await { + Ok(_) => {} + Err(_e) => break, + } + } } } + Ok::<_, Error>(nexus_sink) + }); + + let mut close_frame = None; + loop { tokio::select! { - msg = nexus_read => { + _ = &mut jh => break, + msg = nexus_stream.next() => { match msg { None => { // websocket connection to nexus client closed unexpectedly @@ -2101,82 +2086,37 @@ impl super::Nexus { // TODO: json payloads specifying client-sent metadata? } Some(Ok(WebSocketMessage::Binary(data))) => { - debug_assert!( - buffered_input.is_none(), - "attempted to drop buffered_input message ({buffered_input:?})", - ); - buffered_input = Some(data); - } - // Frame won't exist at this level, and ping reply is handled by tungstenite - Some(Ok(WebSocketMessage::Frame(_) | WebSocketMessage::Ping(_) | WebSocketMessage::Pong(_))) => {} - } - } - result = nexus_reserve => { - let permit = result?; - let message = buffered_output - .take() - .expect("nexus_reserve is only active when buffered_output is Some"); - permit.send(message)?.await?; - } - result = propolis_readiness => { - let ready = result?; - if ready.is_readable() { - match propolis_conn.try_read(&mut propolis_read_buffer) { - Ok(num_bytes) => { - let prev = buffered_output.replace(WebSocketMessage::Binary( - Vec::from(&propolis_read_buffer[..num_bytes]) - )); - if prev.is_some() { - panic!("propolis_readiness is only readable when buffered_output is None"); + let mut start = 0; + while start < data.len() { + match propolis_writer.write(&data[start..]).await { + Ok(num_bytes) => { + start += num_bytes; + } + Err(e) => { + close_frame = Some(CloseFrame { + code: CloseCode::Error, + reason: e.to_string().into(), + }); + break; + } } } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - // readiness event was a false-positive. ignore - continue; - } - Err(e) => { - nexus_sink.send(WebSocketMessage::Close(Some(CloseFrame { - code: CloseCode::Abnormal, - reason: std::borrow::Cow::from( - "nexus: TCP connection to VNC closed unexpectedly while reading" - ), - }))).await?; - return Err(e.into()); - } - } - } - if ready.is_writable() { - let data = buffered_input - .take() - .expect("propolis_readiness is only writable when buffered_input is Some"); - - match propolis_conn.try_write(&data) { - Ok(num_bytes) => { - if num_bytes == data.len() { - buffered_input = None; - } else { - buffered_input = Some(Vec::from(&data[num_bytes..])); - } - } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - // readiness event was a false-positive. ignore - continue; - } - Err(e) => { - nexus_sink.send(WebSocketMessage::Close(Some(CloseFrame { - code: CloseCode::Abnormal, - reason: std::borrow::Cow::from( - "nexus: TCP connection to VNC closed unexpectedly while writing" - ), - }))).await?; - return Err(e.into()); - } } + // Frame won't exist at this level, and ping reply is handled by tungstenite + Some(Ok(WebSocketMessage::Frame(_) | WebSocketMessage::Ping(_) | WebSocketMessage::Pong(_))) => {} } } } } + let _ = closed_tx.send(()).is_ok(); + if let Ok(Ok(mut nexus_sink)) = jh.await { + let _ = nexus_sink + .send(WebSocketMessage::Close(close_frame)) + .await + .is_ok(); + } + Ok(()) } @@ -2426,8 +2366,11 @@ mod tests { None, ) .await; - Nexus::wrap_tcp_socket_ws(nexus_client_stream, propolis_client_conn) - .await + Nexus::proxy_tcp_socket_ws( + nexus_client_stream, + propolis_client_conn, + ) + .await }); let mut nexus_client_ws = WebSocketStream::from_raw_socket( nexus_client_conn, From 730429962ed4d5681298634597f6e5b1b05bc2ee Mon Sep 17 00:00:00 2001 From: lif <> Date: Wed, 27 Mar 2024 19:10:31 +0000 Subject: [PATCH 7/8] wip - vnc proxy - see if bumping up frame size improves perf --- nexus/src/app/instance.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nexus/src/app/instance.rs b/nexus/src/app/instance.rs index 12a54ab8888..88b230d0963 100644 --- a/nexus/src/app/instance.rs +++ b/nexus/src/app/instance.rs @@ -2042,8 +2042,8 @@ impl super::Nexus { let (closed_tx, mut closed_rx) = tokio::sync::oneshot::channel::<()>(); let mut jh = tokio::spawn(async move { - // medium-sized websocket binary frame - let mut read_buffer = vec![0u8; 65536]; + // big enough for 1024x768 32bpp and then some + let mut read_buffer = vec![0u8; 4 * 1024 * 1024]; loop { tokio::select! { _ = &mut closed_rx => break, From 3d116a771a135290dcc0cbb4c7352d2769e21648 Mon Sep 17 00:00:00 2001 From: lif <> Date: Thu, 28 Mar 2024 07:13:47 +0000 Subject: [PATCH 8/8] wip - vnc proxy - avoid double join handle --- nexus/src/app/instance.rs | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/nexus/src/app/instance.rs b/nexus/src/app/instance.rs index 88b230d0963..539294ddbd1 100644 --- a/nexus/src/app/instance.rs +++ b/nexus/src/app/instance.rs @@ -2064,10 +2064,21 @@ impl super::Nexus { }); let mut close_frame = None; - + let mut task_joined = false; loop { tokio::select! { - _ = &mut jh => break, + res = &mut jh => { + task_joined = true; + if let Ok(Ok(mut nexus_sink)) = res { + // .take() here avoids borrow collision in the cleanup code + // below the loop where we also join the task if it hasn't been + let _ = nexus_sink + .send(WebSocketMessage::Close(close_frame.take())) + .await + .is_ok(); + } + break; + } msg = nexus_stream.next() => { match msg { None => { @@ -2109,12 +2120,15 @@ impl super::Nexus { } } - let _ = closed_tx.send(()).is_ok(); - if let Ok(Ok(mut nexus_sink)) = jh.await { - let _ = nexus_sink - .send(WebSocketMessage::Close(close_frame)) - .await - .is_ok(); + // double-joining a task handle is a panic + if !task_joined { + let _ = closed_tx.send(()).is_ok(); + if let Ok(Ok(mut nexus_sink)) = jh.await { + let _ = nexus_sink + .send(WebSocketMessage::Close(close_frame)) + .await + .is_ok(); + } } Ok(())