@@ -229,3 +229,76 @@ func (s) TestRSTDuringMessageRead(t *testing.T) {
229229 t .Fatalf ("client.EmptyCall() returned %v; want status with code %v" , err , codes .Canceled )
230230 }
231231}
232+
233+ // Test verifies that a client-side cancellation correctly frees up resources on
234+ // the server. The test setup is designed to simulate a scenario where a server
235+ // is blocked from sending a large message due to a full client-side flow
236+ // control window. The client-side cancellation of this blocked RPC then frees
237+ // up the max concurrent streams quota on the server, allowing a new RPC to be
238+ // created successfully.
239+ func (s ) TestCancelWhileServerWaitingForFlowControl (t * testing.T ) {
240+ serverDoneCh := make (chan struct {}, 2 )
241+ const flowControlWindowSize = 65535
242+ ss := & stubserver.StubServer {
243+ StreamingOutputCallF : func (_ * testpb.StreamingOutputCallRequest , stream testpb.TestService_StreamingOutputCallServer ) error {
244+ // Send a large message to exhaust the client's flow control window.
245+ stream .Send (& testpb.StreamingOutputCallResponse {
246+ Payload : & testpb.Payload {
247+ Body : make ([]byte , flowControlWindowSize + 1 ),
248+ },
249+ })
250+ serverDoneCh <- struct {}{}
251+ return nil
252+ },
253+ }
254+
255+ // Create a server that allows only 1 stream at a time.
256+ ss = stubserver .StartTestService (t , ss , grpc .MaxConcurrentStreams (1 ))
257+ defer ss .Stop ()
258+ // Use a static flow control window.
259+ if err := ss .StartClient (grpc .WithStaticStreamWindowSize (flowControlWindowSize )); err != nil {
260+ t .Fatalf ("Error while start test service client: %v" , err )
261+ }
262+ client := ss .Client
263+ ctx , cancel := context .WithTimeout (context .Background (), defaultTestTimeout )
264+ defer cancel ()
265+
266+ streamCtx , streamCancel := context .WithCancel (ctx )
267+ defer streamCancel ()
268+
269+ if _ , err := client .StreamingOutputCall (streamCtx , & testpb.StreamingOutputCallRequest {}); err != nil {
270+ t .Fatalf ("Failed to create server streaming RPC: %v" , err )
271+ }
272+
273+ // Wait for the server handler to return. This should cause the trailers to
274+ // be buffered on the server, waiting for flow control quota to first send
275+ // the data frame.
276+ select {
277+ case <- ctx .Done ():
278+ t .Fatal ("Context timed out waiting for server handler to return." )
279+ case <- serverDoneCh :
280+ }
281+
282+ // Attempt to create a stream. It should fail since the previous stream is
283+ // still blocked.
284+ shortCtx , shortCancel := context .WithTimeout (ctx , defaultTestShortTimeout )
285+ defer shortCancel ()
286+ _ , err := client .StreamingOutputCall (shortCtx , & testpb.StreamingOutputCallRequest {})
287+ if status .Code (err ) != codes .DeadlineExceeded {
288+ t .Fatalf ("Server stream creation returned error with unexpected status code: %v, want code: %v" , err , codes .DeadlineExceeded )
289+ }
290+
291+ // Cancel the RPC, this should free up concurrent stream quota on the
292+ // server.
293+ streamCancel ()
294+
295+ // Attempt to create another stream.
296+ stream , err := client .StreamingOutputCall (ctx , & testpb.StreamingOutputCallRequest {})
297+ if err != nil {
298+ t .Fatalf ("Failed to create server streaming RPC: %v" , err )
299+ }
300+ _ , err = stream .Recv ()
301+ if err != nil {
302+ t .Fatalf ("Failed to read from the stream: %v" , err )
303+ }
304+ }
0 commit comments