Skip to content

Commit b458a4f

Browse files
authored
transport: stop always closing connections when loopy returns (#6110)
1 parent 11e2506 commit b458a4f

File tree

4 files changed

+57
-38
lines changed

4 files changed

+57
-38
lines changed

internal/transport/controlbuf.go

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"bytes"
2323
"errors"
2424
"fmt"
25+
"net"
2526
"runtime"
2627
"strconv"
2728
"sync"
@@ -486,12 +487,13 @@ type loopyWriter struct {
486487
hEnc *hpack.Encoder // HPACK encoder.
487488
bdpEst *bdpEstimator
488489
draining bool
490+
conn net.Conn
489491

490492
// Side-specific handlers
491493
ssGoAwayHandler func(*goAway) (bool, error)
492494
}
493495

494-
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter {
496+
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn) *loopyWriter {
495497
var buf bytes.Buffer
496498
l := &loopyWriter{
497499
side: s,
@@ -504,6 +506,7 @@ func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimato
504506
hBuf: &buf,
505507
hEnc: hpack.NewEncoder(&buf),
506508
bdpEst: bdpEst,
509+
conn: conn,
507510
}
508511
return l
509512
}
@@ -521,15 +524,27 @@ const minBatchSize = 1000
521524
// 2. Stream level flow control quota available.
522525
//
523526
// In each iteration of run loop, other than processing the incoming control
524-
// frame, loopy calls processData, which processes one node from the activeStreams linked-list.
525-
// This results in writing of HTTP2 frames into an underlying write buffer.
526-
// When there's no more control frames to read from controlBuf, loopy flushes the write buffer.
527-
// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
528-
// if the batch size is too low to give stream goroutines a chance to fill it up.
527+
// frame, loopy calls processData, which processes one node from the
528+
// activeStreams linked-list. This results in writing of HTTP2 frames into an
529+
// underlying write buffer. When there's no more control frames to read from
530+
// controlBuf, loopy flushes the write buffer. As an optimization, to increase
531+
// the batch size for each flush, loopy yields the processor, once if the batch
532+
// size is too low to give stream goroutines a chance to fill it up.
533+
//
534+
// Upon exiting, if the error causing the exit is not an I/O error, run()
535+
// flushes and closes the underlying connection. Otherwise, the connection is
536+
// left open to allow the I/O error to be encountered by the reader instead.
529537
func (l *loopyWriter) run() (err error) {
530-
// Always flush the writer before exiting in case there are pending frames
531-
// to be sent.
532-
defer l.framer.writer.Flush()
538+
defer func() {
539+
if logger.V(logLevel) {
540+
logger.Infof("transport: loopyWriter exiting with error: %v", err)
541+
}
542+
if !isIOError(err) {
543+
l.framer.writer.Flush()
544+
l.conn.Close()
545+
}
546+
l.cbuf.finish()
547+
}()
533548
for {
534549
it, err := l.cbuf.get(true)
535550
if err != nil {
@@ -757,6 +772,7 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
757772
}
758773
}
759774
if l.draining && len(l.estdStreams) == 0 {
775+
// Flush and close the connection; we are done with it.
760776
return errors.New("finished processing active streams while in draining mode")
761777
}
762778
return nil
@@ -792,6 +808,7 @@ func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
792808
if l.side == clientSide {
793809
l.draining = true
794810
if len(l.estdStreams) == 0 {
811+
// Flush and close the connection; we are done with it.
795812
return errors.New("received GOAWAY with no active streams")
796813
}
797814
}
@@ -810,13 +827,6 @@ func (l *loopyWriter) goAwayHandler(g *goAway) error {
810827
return nil
811828
}
812829

813-
func (l *loopyWriter) closeConnectionHandler() error {
814-
// Exit loopyWriter entirely by returning an error here. This will lead to
815-
// the transport closing the connection, and, ultimately, transport
816-
// closure.
817-
return ErrConnClosing
818-
}
819-
820830
func (l *loopyWriter) handle(i interface{}) error {
821831
switch i := i.(type) {
822832
case *incomingWindowUpdate:
@@ -846,7 +856,9 @@ func (l *loopyWriter) handle(i interface{}) error {
846856
case *outFlowControlSizeRequest:
847857
l.outFlowControlSizeRequestHandler(i)
848858
case closeConnection:
849-
return l.closeConnectionHandler()
859+
// Just return a non-I/O error and run() will flush and close the
860+
// connection.
861+
return ErrConnClosing
850862
default:
851863
return fmt.Errorf("transport: unknown control message type %T", i)
852864
}
@@ -905,7 +917,7 @@ func (l *loopyWriter) processData() (bool, error) {
905917
return false, err
906918
}
907919
if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
908-
return false, nil
920+
return false, err
909921
}
910922
} else {
911923
l.activeStreams.enqueue(str)

internal/transport/http2_client.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -444,15 +444,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
444444
return nil, err
445445
}
446446
go func() {
447-
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
448-
err := t.loopy.run()
449-
if logger.V(logLevel) {
450-
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
451-
}
452-
// Do not close the transport. Let reader goroutine handle it since
453-
// there might be data in the buffers.
454-
t.conn.Close()
455-
t.controlBuf.finish()
447+
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn)
448+
t.loopy.run()
456449
close(t.writerDone)
457450
}()
458451
return t, nil

internal/transport/http2_server.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -331,14 +331,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
331331
t.handleSettings(sf)
332332

333333
go func() {
334-
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
334+
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn)
335335
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
336-
err := t.loopy.run()
337-
if logger.V(logLevel) {
338-
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
339-
}
340-
t.conn.Close()
341-
t.controlBuf.finish()
336+
t.loopy.run()
342337
close(t.writerDone)
343338
}()
344339
go t.keepalive()
@@ -1355,9 +1350,6 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
13551350
return false, err
13561351
}
13571352
if retErr != nil {
1358-
// Abruptly close the connection following the GoAway (via
1359-
// loopywriter). But flush out what's inside the buffer first.
1360-
t.framer.writer.Flush()
13611353
return false, retErr
13621354
}
13631355
return true, nil

internal/transport/http_util.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package transport
2121
import (
2222
"bufio"
2323
"encoding/base64"
24+
"errors"
2425
"fmt"
2526
"io"
2627
"math"
@@ -330,7 +331,8 @@ func (w *bufWriter) Write(b []byte) (n int, err error) {
330331
return 0, w.err
331332
}
332333
if w.batchSize == 0 { // Buffer has been disabled.
333-
return w.conn.Write(b)
334+
n, err = w.conn.Write(b)
335+
return n, toIOError(err)
334336
}
335337
for len(b) > 0 {
336338
nn := copy(w.buf[w.offset:], b)
@@ -352,10 +354,30 @@ func (w *bufWriter) Flush() error {
352354
return nil
353355
}
354356
_, w.err = w.conn.Write(w.buf[:w.offset])
357+
w.err = toIOError(w.err)
355358
w.offset = 0
356359
return w.err
357360
}
358361

362+
type ioError struct {
363+
error
364+
}
365+
366+
func (i ioError) Unwrap() error {
367+
return i.error
368+
}
369+
370+
func isIOError(err error) bool {
371+
return errors.As(err, &ioError{})
372+
}
373+
374+
func toIOError(err error) error {
375+
if err == nil {
376+
return nil
377+
}
378+
return ioError{error: err}
379+
}
380+
359381
type framer struct {
360382
writer *bufWriter
361383
fr *http2.Framer

0 commit comments

Comments
 (0)