From 1d9377bc5e2cc5c2dfe07a888368a9868db2e80a Mon Sep 17 00:00:00 2001 From: Shikugawa Date: Tue, 13 Apr 2021 22:24:51 +0900 Subject: [PATCH 1/6] gRPC stream support Signed-off-by: Shikugawa --- src/dispatcher.rs | 138 ++++++++++++++++++++++++++++++++++++++++++++++ src/hostcalls.rs | 92 +++++++++++++++++++++++++++++++ src/traits.rs | 30 ++++++++++ src/types.rs | 1 + 4 files changed, 261 insertions(+) diff --git a/src/dispatcher.rs b/src/dispatcher.rs index d9c9d491..7f3acafc 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -38,6 +38,10 @@ pub(crate) fn register_callout(token_id: u32) { DISPATCHER.with(|dispatcher| dispatcher.register_callout(token_id)); } +pub(crate) fn register_grpc_stream(token_id: u32) { + DISPATCHER.with(|dispatcher| dispatcher.register_grpc_stream(token_id)); +} + struct NoopRoot; impl Context for NoopRoot {} @@ -52,6 +56,7 @@ struct Dispatcher { http_streams: RefCell>>, active_id: Cell, callouts: RefCell>, + grpc_stream_callouts: RefCell>, } impl Dispatcher { @@ -65,6 +70,7 @@ impl Dispatcher { http_streams: RefCell::new(HashMap::new()), active_id: Cell::new(0), callouts: RefCell::new(HashMap::new()), + grpc_stream_callouts: RefCell::new(HashMap::new()), } } @@ -91,6 +97,17 @@ impl Dispatcher { } } + fn register_grpc_stream(&self, token_id: u32) { + if self + .grpc_stream_callouts + .borrow_mut() + .insert(token_id, self.active_id.get()) + .is_some() + { + panic!("duplicate token_id") + } + } + fn create_root_context(&self, context_id: u32) { let new_context = match self.new_root.get() { Some(f) => f(context_id), @@ -170,6 +187,99 @@ impl Dispatcher { } } + fn on_grpc_receive_initial_metadata(&self, token_id: u32, headers: u32) { + let context_id = self + .grpc_stream_callouts + .borrow_mut() + .get(&token_id) + .expect("invalid token_id") + .clone(); + + if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) { + self.active_id.set(context_id); + hostcalls::set_effective_context(context_id).unwrap(); + http_stream.on_grpc_receive_initial_metadata(token_id, headers); + } else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) { + self.active_id.set(context_id); + hostcalls::set_effective_context(context_id).unwrap(); + stream.on_grpc_receive_initial_metadata(token_id, headers); + } else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) { + self.active_id.set(context_id); + hostcalls::set_effective_context(context_id).unwrap(); + root.on_grpc_receive_initial_metadata(token_id, headers); + } + } + + fn on_grpc_receive_trailing_metadata(&self, token_id: u32, trailers: u32) { + let context_id = self + .grpc_stream_callouts + .borrow_mut() + .get(&token_id) + .expect("invalid token_id") + .clone(); + + if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) { + self.active_id.set(context_id); + hostcalls::set_effective_context(context_id).unwrap(); + http_stream.on_grpc_receive_trailing_metadata(token_id, trailers); + } else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) { + self.active_id.set(context_id); + hostcalls::set_effective_context(context_id).unwrap(); + stream.on_grpc_receive_trailing_metadata(token_id, trailers); + } else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) { + self.active_id.set(context_id); + hostcalls::set_effective_context(context_id).unwrap(); + root.on_grpc_receive_trailing_metadata(token_id, trailers); + } + } + + fn on_grpc_receive(&self, token_id: u32, response_size: usize) { + // TODO(shikugawa): migrate with gRPC callout tokens + let context_id = self + .grpc_stream_callouts + .borrow_mut() + .get(&token_id) + .expect("invalid token_id") + .clone(); + + if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) { + self.active_id.set(context_id); + hostcalls::set_effective_context(context_id).unwrap(); + http_stream.on_grpc_receive(token_id, response_size); + } else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) { + self.active_id.set(context_id); + hostcalls::set_effective_context(context_id).unwrap(); + stream.on_grpc_receive(token_id, response_size); + } else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) { + self.active_id.set(context_id); + hostcalls::set_effective_context(context_id).unwrap(); + root.on_grpc_receive(token_id, response_size); + } + } + + fn on_grpc_close(&self, token_id: u32, status_code: u32) { + // TODO(shikugawa): migrate with gRPC callout tokens + let context_id = self + .grpc_stream_callouts + .borrow_mut() + .remove(&token_id) + .expect("invalid token_id"); + + if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) { + self.active_id.set(context_id); + hostcalls::set_effective_context(context_id).unwrap(); + http_stream.on_grpc_close(token_id, status_code); + } else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) { + self.active_id.set(context_id); + hostcalls::set_effective_context(context_id).unwrap(); + stream.on_grpc_close(token_id, status_code); + } else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) { + self.active_id.set(context_id); + hostcalls::set_effective_context(context_id).unwrap(); + root.on_grpc_close(token_id, status_code); + } + } + fn on_done(&self, context_id: u32) -> bool { if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) { self.active_id.set(context_id); @@ -509,3 +619,31 @@ pub extern "C" fn proxy_on_http_call_response( dispatcher.on_http_call_response(token_id, num_headers, body_size, num_trailers) }) } + +#[no_mangle] +pub extern "C" fn proxy_on_grpc_receive_initial_metadata( + _context_id: u32, + token_id: u32, + headers: u32, +) { + DISPATCHER.with(|dispatcher| dispatcher.on_grpc_receive_initial_metadata(token_id, headers)) +} + +#[no_mangle] +pub extern "C" fn proxy_on_grpc_receive_trailing_metadata( + _context_id: u32, + token_id: u32, + trailers: u32, +) { + DISPATCHER.with(|dispatcher| dispatcher.on_grpc_receive_trailing_metadata(token_id, trailers)) +} + +#[no_mangle] +pub extern "C" fn proxy_on_grpc_receive(_context_id: u32, token_id: u32, response_size: usize) { + DISPATCHER.with(|dispatcher| dispatcher.on_grpc_receive(token_id, response_size)) +} + +#[no_mangle] +pub extern "C" fn proxy_on_grpc_close(_context_id: u32, token_id: u32, status_code: u32) { + DISPATCHER.with(|dispatcher| dispatcher.on_grpc_close(token_id, status_code)) +} diff --git a/src/hostcalls.rs b/src/hostcalls.rs index 8f37f2ae..8c9ec320 100644 --- a/src/hostcalls.rs +++ b/src/hostcalls.rs @@ -651,6 +651,98 @@ pub fn dispatch_http_call( } } +extern "C" { + fn proxy_grpc_stream( + grpc_service: *const u8, + grpc_service_size: usize, // TODO(shikugawa): remove grpc_service after next ABI released. + service_name_data: *const u8, + service_name_size: usize, + method_name_data: *const u8, + method_name_size: usize, + initial_metadata_data: *const u8, + initial_metadata_size: usize, + return_callout_id: *mut u32, + ) -> Status; +} + +pub fn create_grpc_stream( + cluster_name: &str, + service_name: &str, + method_name: &str, + initial_metadata: &str, +) -> Result { + unsafe { + let mut return_callout_id = 0; + match proxy_grpc_stream( + cluster_name.as_ptr(), + cluster_name.len(), + service_name.as_ptr(), + service_name.len(), + method_name.as_ptr(), + method_name.len(), + initial_metadata.as_ptr(), + initial_metadata.len(), + &mut return_callout_id, + ) { + Status::Ok => { + dispatcher::register_grpc_stream(return_callout_id); + Ok(return_callout_id) + } + Status::ParseFailure => Err(Status::ParseFailure), + Status::InternalFailure => Err(Status::InternalFailure), + status => panic!("unexpected status: {}", status as u32), + } + } +} + +extern "C" { + fn proxy_grpc_send( + token: u32, + message_ptr: *const u8, + message_len: usize, + end_stream: bool, + ) -> Status; +} + +pub fn grpc_send(token: u32, message: &str, end_stream: bool) -> Result<(), Status> { + unsafe { + match proxy_grpc_send(token, message.as_ptr(), message.len(), end_stream) { + Status::Ok => Ok(()), + Status::BadArgument => Err(Status::BadArgument), + Status::NotFound => Err(Status::NotFound), + status => panic!("unexpected status: {}", status as u32), + } + } +} + +extern "C" { + fn proxy_grpc_cancel(token_id: u32) -> Status; +} + +pub fn grpc_cancel(token_id: u32) -> Result<(), Status> { + unsafe { + match proxy_grpc_cancel(token_id) { + Status::Ok => Ok(()), + Status::NotFound => Err(Status::NotFound), + status => panic!("unexpected status: {}", status as u32), + } + } +} + +extern "C" { + fn proxy_grpc_close(token_id: u32) -> Status; +} + +pub fn grpc_close(token_id: u32) -> Result<(), Status> { + unsafe { + match proxy_grpc_close(token_id) { + Status::Ok => Ok(()), + Status::NotFound => Err(Status::NotFound), + status => panic!("unexpected status: {}", status as u32), + } + } +} + extern "C" { fn proxy_set_effective_context(context_id: u32) -> Status; } diff --git a/src/traits.rs b/src/traits.rs index 5b7fc4be..681dd9d4 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -90,6 +90,36 @@ pub trait Context { hostcalls::get_map(MapType::HttpCallResponseTrailers).unwrap() } + fn create_grpc_stream( + &self, + cluster_name: &str, + service_name: &str, + method_name: &str, + initial_metadata: &str, + ) -> Result { + hostcalls::create_grpc_stream(cluster_name, service_name, method_name, initial_metadata) + } + + fn grpc_send(&self, token_id: u32, message: &str, end_stream: bool) -> Result<(), Status> { + hostcalls::grpc_send(token_id, message, end_stream) + } + + fn grpc_cancel(&self, token_id: u32) -> Result<(), Status> { + hostcalls::grpc_cancel(token_id) + } + + fn grpc_close(&self, token_id: u32) -> Result<(), Status> { + hostcalls::grpc_close(token_id) + } + + fn on_grpc_receive_initial_metadata(&mut self, _token_id: u32, _headers: u32) {} + + fn on_grpc_receive_trailing_metadata(&mut self, _token_id: u32, _trailers: u32) {} + + fn on_grpc_receive(&mut self, _token_id: u32, _response_size: usize) {} + + fn on_grpc_close(&mut self, _token_id: u32, _status_code: u32) {} + fn on_done(&mut self) -> bool { true } diff --git a/src/types.rs b/src/types.rs index 855a414b..f92a2a96 100644 --- a/src/types.rs +++ b/src/types.rs @@ -42,6 +42,7 @@ pub enum Status { Ok = 0, NotFound = 1, BadArgument = 2, + ParseFailure = 4, Empty = 7, CasMismatch = 8, InternalFailure = 10, From f3fe65660d2223f30df05cd7767896bbea836c6b Mon Sep 17 00:00:00 2001 From: Shikugawa Date: Mon, 10 May 2021 00:00:55 +0900 Subject: [PATCH 2/6] fix with reviews Signed-off-by: Shikugawa --- src/dispatcher.rs | 124 ++++++++++++++++++++++++++-------------------- src/hostcalls.rs | 32 +++++++++--- src/traits.rs | 38 +++++++------- 3 files changed, 115 insertions(+), 79 deletions(-) diff --git a/src/dispatcher.rs b/src/dispatcher.rs index 7bcf9b03..9a51085a 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -61,7 +61,7 @@ struct Dispatcher { active_id: Cell, callouts: RefCell>, grpc_callouts: RefCell>, - grpc_stream_tokens: RefCell>, + grpc_streams: RefCell>, } impl Dispatcher { @@ -76,7 +76,7 @@ impl Dispatcher { active_id: Cell::new(0), callouts: RefCell::new(HashMap::new()), grpc_callouts: RefCell::new(HashMap::new()), - grpc_stream_tokens: RefCell::new(HashMap::new()), + grpc_streams: RefCell::new(HashMap::new()), } } @@ -103,9 +103,9 @@ impl Dispatcher { } } - fn register_grpc_stream_tokens(&self, token_id: u32) { + fn register_grpc_stream(&self, token_id: u32) { if self - .grpc_stream_tokens + .grpc_streams .borrow_mut() .insert(token_id, self.active_id.get()) .is_some() @@ -416,6 +416,29 @@ impl Dispatcher { } } + fn on_grpc_receive_initial_metadata(&self, token_id: u32, headers: u32) { + let context_id = self + .grpc_streams + .borrow_mut() + .get(&token_id) + .expect("invalid token_id") + .clone(); + + if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) { + self.active_id.set(context_id); + hostcalls::set_effective_context(context_id).unwrap(); + http_stream.on_grpc_stream_initial_metadata(token_id, headers); + } else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) { + self.active_id.set(context_id); + hostcalls::set_effective_context(context_id).unwrap(); + stream.on_grpc_stream_initial_metadata(token_id, headers); + } else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) { + self.active_id.set(context_id); + hostcalls::set_effective_context(context_id).unwrap(); + root.on_grpc_stream_initial_metadata(token_id, headers); + } + } + fn on_grpc_receive(&self, token_id: u32, response_size: usize) { if let Some(context_id) = self.grpc_callouts.borrow_mut().remove(&token_id) { if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) { @@ -431,26 +454,49 @@ impl Dispatcher { hostcalls::set_effective_context(context_id).unwrap(); root.on_grpc_call_response(token_id, 0, response_size); } - } else if let Some(context_id) = self.grpc_callouts.borrow_mut().get(&token_id) { + } else if let Some(context_id) = self.grpc_streams.borrow_mut().get(&token_id) { let context_id = context_id.clone(); if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) { self.active_id.set(context_id); hostcalls::set_effective_context(context_id).unwrap(); - http_stream.on_grpc_stream_receive_body(token_id, response_size); + http_stream.on_grpc_stream_message(token_id, response_size); } else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) { self.active_id.set(context_id); hostcalls::set_effective_context(context_id).unwrap(); - stream.on_grpc_stream_receive_body(token_id, response_size); + stream.on_grpc_stream_message(token_id, response_size); } else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) { self.active_id.set(context_id); hostcalls::set_effective_context(context_id).unwrap(); - root.on_grpc_stream_receive_body(token_id, response_size); + root.on_grpc_stream_message(token_id, response_size); } } else { panic!("invalid token_id") } } + fn on_grpc_receive_trailing_metadata(&self, token_id: u32, trailers: u32) { + let context_id = self + .grpc_streams + .borrow_mut() + .get(&token_id) + .expect("invalid token_id") + .clone(); + + if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) { + self.active_id.set(context_id); + hostcalls::set_effective_context(context_id).unwrap(); + http_stream.on_grpc_stream_initial_metadata(token_id, trailers); + } else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) { + self.active_id.set(context_id); + hostcalls::set_effective_context(context_id).unwrap(); + stream.on_grpc_stream_initial_metadata(token_id, trailers); + } else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) { + self.active_id.set(context_id); + hostcalls::set_effective_context(context_id).unwrap(); + root.on_grpc_stream_initial_metadata(token_id, trailers); + } + } + fn on_grpc_close(&self, token_id: u32, status_code: u32) { if let Some(context_id) = self.grpc_callouts.borrow_mut().remove(&token_id) { if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) { @@ -466,7 +512,7 @@ impl Dispatcher { hostcalls::set_effective_context(context_id).unwrap(); root.on_grpc_call_response(token_id, status_code, 0); } - } else if let Some(context_id) = self.grpc_stream_tokens.borrow_mut().remove(&token_id) { + } else if let Some(context_id) = self.grpc_streams.borrow_mut().remove(&token_id) { if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) { self.active_id.set(context_id); hostcalls::set_effective_context(context_id).unwrap(); @@ -484,52 +530,6 @@ impl Dispatcher { panic!("invalid token_id") } } - - fn on_grpc_receive_initial_metadata(&self, token_id: u32, headers: u32) { - let context_id = self - .grpc_stream_tokens - .borrow_mut() - .get(&token_id) - .expect("invalid token_id") - .clone(); - - if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) { - self.active_id.set(context_id); - hostcalls::set_effective_context(context_id).unwrap(); - http_stream.on_grpc_stream_receive_initial_metadata(token_id, headers); - } else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) { - self.active_id.set(context_id); - hostcalls::set_effective_context(context_id).unwrap(); - stream.on_grpc_stream_receive_initial_metadata(token_id, headers); - } else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) { - self.active_id.set(context_id); - hostcalls::set_effective_context(context_id).unwrap(); - root.on_grpc_stream_receive_initial_metadata(token_id, headers); - } - } - - fn on_grpc_receive_trailing_metadata(&self, token_id: u32, trailers: u32) { - let context_id = self - .grpc_stream_tokens - .borrow_mut() - .get(&token_id) - .expect("invalid token_id") - .clone(); - - if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) { - self.active_id.set(context_id); - hostcalls::set_effective_context(context_id).unwrap(); - http_stream.on_grpc_stream_receive_trailing_metadata(token_id, trailers); - } else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) { - self.active_id.set(context_id); - hostcalls::set_effective_context(context_id).unwrap(); - stream.on_grpc_stream_receive_trailing_metadata(token_id, trailers); - } else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) { - self.active_id.set(context_id); - hostcalls::set_effective_context(context_id).unwrap(); - root.on_grpc_stream_receive_trailing_metadata(token_id, trailers); - } - } } #[no_mangle] @@ -659,11 +659,25 @@ pub extern "C" fn proxy_on_http_call_response( }) } +#[no_mangle] +pub extern "C" fn proxy_on_grpc_receive_initial_metadata( + _context_id: u32, + token_id: u32, + headers: u32, +) { + DISPATCHER.with(|dispatcher| dispatcher.on_grpc_receive_initial_metadata(token_id, headers)) +} + #[no_mangle] pub extern "C" fn proxy_on_grpc_receive(_context_id: u32, token_id: u32, response_size: usize) { DISPATCHER.with(|dispatcher| dispatcher.on_grpc_receive(token_id, response_size)) } +#[no_mangle] +pub extern "C" fn proxy_on_grpc_trailing_metadata(_context_id: u32, token_id: u32, trailers: u32) { + DISPATCHER.with(|dispatcher| dispatcher.on_grpc_receive_trailing_metadata(token_id, trailers)) +} + #[no_mangle] pub extern "C" fn proxy_on_grpc_close(_context_id: u32, token_id: u32, status_code: u32) { DISPATCHER.with(|dispatcher| dispatcher.on_grpc_close(token_id, status_code)) diff --git a/src/hostcalls.rs b/src/hostcalls.rs index e404818f..947d0559 100644 --- a/src/hostcalls.rs +++ b/src/hostcalls.rs @@ -177,6 +177,24 @@ pub fn get_map(map_type: MapType) -> Result, Status> { } } +pub fn get_map_bytes(map_type: MapType) -> Result)>, Status> { + unsafe { + let mut return_data: *mut u8 = null_mut(); + let mut return_size: usize = 0; + match proxy_get_header_map_pairs(map_type, &mut return_data, &mut return_size) { + Status::Ok => { + if !return_data.is_null() { + let serialized_map = Vec::from_raw_parts(return_data, return_size, return_size); + Ok(utils::deserialize_bytes_map(&serialized_map)) + } else { + Ok(Vec::new()) + } + } + status => panic!("unexpected status: {}", status as u32), + } + } +} + extern "C" { fn proxy_set_header_map_pairs( map_type: MapType, @@ -677,7 +695,7 @@ pub fn dispatch_grpc_call( timeout: Duration, ) -> Result { let mut return_callout_id = 0; - let serialized_initial_metadata = utils::serialize_bytes_value_map(initial_metadata); + let serialized_initial_metadata = utils::serialize_bytes_map(initial_metadata); unsafe { match proxy_grpc_call( upstream_name.as_ptr(), @@ -718,14 +736,14 @@ extern "C" { ) -> Status; } -pub fn create_grpc_stream( +pub fn open_grpc_stream( upstream_name: &str, service_name: &str, method_name: &str, initial_metadata: Vec<(&str, &[u8])>, ) -> Result { let mut return_stream_id = 0; - let serialized_initial_metadata = utils::serialize_bytes_value_map(initial_metadata); + let serialized_initial_metadata = utils::serialize_bytes_map(initial_metadata); unsafe { match proxy_grpc_stream( upstream_name.as_ptr(), @@ -758,7 +776,7 @@ extern "C" { ) -> Status; } -pub fn grpc_stream_send( +pub fn send_grpc_stream_message( token: u32, message: Option<&[u8]>, end_stream: bool, @@ -782,7 +800,7 @@ extern "C" { fn proxy_grpc_cancel(token_id: u32) -> Status; } -pub fn grpc_call_cancel(token_id: u32) -> Result<(), Status> { +pub fn cancel_grpc_call(token_id: u32) -> Result<(), Status> { unsafe { match proxy_grpc_cancel(token_id) { Status::Ok => Ok(()), @@ -796,7 +814,7 @@ extern "C" { fn proxy_grpc_close(token_id: u32) -> Status; } -pub fn grpc_stream_close(token_id: u32) -> Result<(), Status> { +pub fn close_grpc_stream(token_id: u32) -> Result<(), Status> { unsafe { match proxy_grpc_close(token_id) { Status::Ok => Ok(()), @@ -938,7 +956,7 @@ mod utils { bytes } - pub(super) fn serialize_bytes_value_map(map: Vec<(&str, &[u8])>) -> Bytes { + pub(super) fn serialize_bytes_map(map: Vec<(&str, &[u8])>) -> Bytes { let mut size: usize = 4; for (name, value) in &map { size += name.len() + value.len() + 10; diff --git a/src/traits.rs b/src/traits.rs index b741734d..b6125bd9 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -119,45 +119,49 @@ pub trait Context { hostcalls::cancel_grpc_call(token_id) } - fn create_grpc_stream( + fn open_grpc_stream( &self, cluster_name: &str, service_name: &str, method_name: &str, initial_metadata: Vec<(&str, &[u8])>, ) -> Result { - hostcalls::create_grpc_stream(cluster_name, service_name, method_name, initial_metadata) + hostcalls::open_grpc_stream(cluster_name, service_name, method_name, initial_metadata) } - fn grpc_stream_send( + fn on_grpc_stream_initial_metadata(&mut self, _token_id: u32, _num_elements: u32) {} + + fn get_grpc_stream_initial_metadata(&self) -> Vec<(String, Vec)> { + hostcalls::get_map_bytes(MapType::GrpcReceiveInitialMetadata).unwrap() + } + + fn send_grpc_stream_message( &self, token_id: u32, message: Option<&[u8]>, end_stream: bool, ) -> Result<(), Status> { - hostcalls::grpc_stream_send(token_id, message, end_stream) + hostcalls::send_grpc_stream_message(token_id, message, end_stream) } - fn grpc_stream_close(&self, token_id: u32) -> Result<(), Status> { - hostcalls::grpc_stream_close(token_id) - } - - fn on_grpc_stream_receive_initial_metadata(&mut self, _token_id: u32, _headers: u32) {} + fn on_grpc_stream_message(&mut self, _token_id: u32, _message_size: usize) {} - fn on_grpc_stream_receive_trailing_metadata(&mut self, _token_id: u32, _trailers: u32) {} - - fn on_grpc_stream_receive_body(&mut self, _token_id: u32, _response_size: usize) {} + fn get_grpc_stream_message(&mut self, start: usize, max_size: usize) -> Option { + hostcalls::get_buffer(BufferType::GrpcReceiveBuffer, start, max_size).unwrap() + } - fn on_grpc_stream_close(&mut self, _token_id: u32, _status_code: u32) {} + fn on_grpc_stream_trailing_metadata(&mut self, _token_id: u32, _num_elements: u32) {} - fn get_grpc_call_initial_metadata(&self) -> Vec<(String, Vec)> { - hostcalls::get_map_bytes(MapType::GrpcReceiveInitialMetadata).unwrap() + fn get_grpc_stream_trailing_metadata(&self) -> Vec<(String, Vec)> { + hostcalls::get_map_bytes(MapType::GrpcReceiveTrailingMetadata).unwrap() } - fn get_grpc_call_trailing_metadata(&self) -> Vec<(String, Vec)> { - hostcalls::get_map_bytes(MapType::GrpcReceiveTrailingMetadata).unwrap() + fn close_grpc_stream(&self, token_id: u32) -> Result<(), Status> { + hostcalls::close_grpc_stream(token_id) } + fn on_grpc_stream_close(&mut self, _token_id: u32, _status_code: u32) {} + fn on_done(&mut self) -> bool { true } From e1551b25cfd1c12621162d14bbb10e876be6f715 Mon Sep 17 00:00:00 2001 From: Shikugawa Date: Mon, 10 May 2021 00:40:02 +0900 Subject: [PATCH 3/6] clippy Signed-off-by: Shikugawa --- src/dispatcher.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/dispatcher.rs b/src/dispatcher.rs index 9a51085a..2db4f88d 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -417,12 +417,11 @@ impl Dispatcher { } fn on_grpc_receive_initial_metadata(&self, token_id: u32, headers: u32) { - let context_id = self + let context_id = *self .grpc_streams .borrow_mut() .get(&token_id) - .expect("invalid token_id") - .clone(); + .expect("invalid token_id"); if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) { self.active_id.set(context_id); @@ -455,7 +454,7 @@ impl Dispatcher { root.on_grpc_call_response(token_id, 0, response_size); } } else if let Some(context_id) = self.grpc_streams.borrow_mut().get(&token_id) { - let context_id = context_id.clone(); + let context_id = *context_id; if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) { self.active_id.set(context_id); hostcalls::set_effective_context(context_id).unwrap(); @@ -475,12 +474,11 @@ impl Dispatcher { } fn on_grpc_receive_trailing_metadata(&self, token_id: u32, trailers: u32) { - let context_id = self + let context_id = *self .grpc_streams .borrow_mut() .get(&token_id) - .expect("invalid token_id") - .clone(); + .expect("invalid token_id"); if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) { self.active_id.set(context_id); From ac7f114aa150cda15c0bcc0bdc1661615cf6d8ad Mon Sep 17 00:00:00 2001 From: Shikugawa Date: Sat, 15 May 2021 11:54:24 +0900 Subject: [PATCH 4/6] fix Signed-off-by: Shikugawa --- src/dispatcher.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/dispatcher.rs b/src/dispatcher.rs index 2db4f88d..71c2509e 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -483,15 +483,15 @@ impl Dispatcher { if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) { self.active_id.set(context_id); hostcalls::set_effective_context(context_id).unwrap(); - http_stream.on_grpc_stream_initial_metadata(token_id, trailers); + http_stream.on_grpc_stream_trailing_metadata(token_id, trailers); } else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) { self.active_id.set(context_id); hostcalls::set_effective_context(context_id).unwrap(); - stream.on_grpc_stream_initial_metadata(token_id, trailers); + stream.on_grpc_stream_trailing_metadata(token_id, trailers); } else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) { self.active_id.set(context_id); hostcalls::set_effective_context(context_id).unwrap(); - root.on_grpc_stream_initial_metadata(token_id, trailers); + root.on_grpc_stream_trailing_metadata(token_id, trailers); } } @@ -672,7 +672,7 @@ pub extern "C" fn proxy_on_grpc_receive(_context_id: u32, token_id: u32, respons } #[no_mangle] -pub extern "C" fn proxy_on_grpc_trailing_metadata(_context_id: u32, token_id: u32, trailers: u32) { +pub extern "C" fn proxy_receive_on_grpc_trailing_metadata(_context_id: u32, token_id: u32, trailers: u32) { DISPATCHER.with(|dispatcher| dispatcher.on_grpc_receive_trailing_metadata(token_id, trailers)) } From 8e9169b560f4192100c31382400b506d73a5bacf Mon Sep 17 00:00:00 2001 From: Shikugawa Date: Sat, 15 May 2021 12:07:22 +0900 Subject: [PATCH 5/6] fix Signed-off-by: Shikugawa --- src/dispatcher.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/dispatcher.rs b/src/dispatcher.rs index 71c2509e..0158f69b 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -672,7 +672,11 @@ pub extern "C" fn proxy_on_grpc_receive(_context_id: u32, token_id: u32, respons } #[no_mangle] -pub extern "C" fn proxy_receive_on_grpc_trailing_metadata(_context_id: u32, token_id: u32, trailers: u32) { +pub extern "C" fn proxy_on_receive_grpc_trailing_metadata( + _context_id: u32, + token_id: u32, + trailers: u32, +) { DISPATCHER.with(|dispatcher| dispatcher.on_grpc_receive_trailing_metadata(token_id, trailers)) } From d2aaf0ae06248d77bb2814eb0fc60198b372943c Mon Sep 17 00:00:00 2001 From: Shikugawa Date: Sat, 15 May 2021 18:23:44 +0900 Subject: [PATCH 6/6] fix Signed-off-by: Shikugawa --- src/dispatcher.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dispatcher.rs b/src/dispatcher.rs index 0158f69b..58fb7a65 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -672,7 +672,7 @@ pub extern "C" fn proxy_on_grpc_receive(_context_id: u32, token_id: u32, respons } #[no_mangle] -pub extern "C" fn proxy_on_receive_grpc_trailing_metadata( +pub extern "C" fn proxy_on_grpc_receive_trailing_metadata( _context_id: u32, token_id: u32, trailers: u32,