From 5f63f1b5be78874558ce0dede5c64f44d4ca4157 Mon Sep 17 00:00:00 2001 From: Qingyang Hu Date: Thu, 17 Nov 2022 10:32:03 -0500 Subject: [PATCH 1/4] GODRIVER-2577 Retry heartbeat on timeout to prevent pool cleanup in FAAS pause. --- x/mongo/driver/topology/server.go | 56 +++++++---- x/mongo/driver/topology/server_test.go | 124 +++++++++++++++++++++++++ 2 files changed, 164 insertions(+), 16 deletions(-) diff --git a/x/mongo/driver/topology/server.go b/x/mongo/driver/topology/server.go index bdd9035ead..4c2d0c35c2 100644 --- a/x/mongo/driver/topology/server.go +++ b/x/mongo/driver/topology/server.go @@ -520,6 +520,7 @@ func (s *Server) update() { } } + timeoutCnt := 0 for { // Check if the server is disconnecting. Even if waitForNextCheck has already read from the done channel, we // can safely read from it again because Disconnect closes the channel. @@ -545,18 +546,35 @@ func (s *Server) update() { continue } - // Must hold the processErrorLock while updating the server description and clearing the - // pool. Not holding the lock leads to possible out-of-order processing of pool.clear() and - // pool.ready() calls from concurrent server description updates. - s.processErrorLock.Lock() - s.updateDescription(desc) - if err := desc.LastError; err != nil { - // Clear the pool once the description has been updated to Unknown. Pass in a nil service ID to clear - // because the monitoring routine only runs for non-load balanced deployments in which servers don't return - // IDs. - s.pool.clear(err, nil) + if isShortcut := func() bool { + // Must hold the processErrorLock while updating the server description and clearing the + // pool. Not holding the lock leads to possible out-of-order processing of pool.clear() and + // pool.ready() calls from concurrent server description updates. + s.processErrorLock.Lock() + defer s.processErrorLock.Unlock() + + s.updateDescription(desc) + if lastError := desc.LastError; lastError != nil { + // Retry after the first timeout in case of a FAAS pause as described in GODRIVER-2577. + if err := unwrapConnectionError(lastError); err != nil { + err, ok := err.(net.Error) + if ok && err.Timeout() && timeoutCnt < 1 { + timeoutCnt++ + // Continue to next loop. + return true + } + } + // Clear the pool once the description has been updated to Unknown. Pass in a nil service ID to clear + // because the monitoring routine only runs for non-load balanced deployments in which servers don't return + // IDs. + s.pool.clear(err, nil) + } + timeoutCnt = 0 + // Run forward. + return false + }(); isShortcut { + continue } - s.processErrorLock.Unlock() // If the server supports streaming or we're already streaming, we want to move to streaming the next response // without waiting. If the server has transitioned to Unknown from a network error, we want to do another @@ -707,28 +725,34 @@ func (s *Server) check() (description.Server, error) { var err error var durationNanos int64 + previousDescription := s.Description() + streamable := previousDescription.TopologyVersion != nil + + isNilConn := s.conn == nil + if !isNilConn { + s.publishServerHeartbeatStartedEvent(s.conn.ID(), s.conn.getCurrentlyStreaming() || streamable) + } // Create a new connection if this is the first check, the connection was closed after an error during the previous // check, or the previous check was cancelled. - if s.conn == nil || s.conn.closed() || s.checkWasCancelled() { + if isNilConn || s.conn.closed() || s.checkWasCancelled() { // Create a new connection and add it's handshake RTT as a sample. err = s.setupHeartbeatConnection() if err == nil { // Use the description from the connection handshake as the value for this check. s.rttMonitor.addSample(s.conn.helloRTT) descPtr = &s.conn.desc + } else { + err = unwrapConnectionError(err) } } - if descPtr == nil && err == nil { + if !isNilConn && err == nil { // An existing connection is being used. Use the server description properties to execute the right heartbeat. // Wrap conn in a type that implements driver.StreamerConnection. heartbeatConn := initConnection{s.conn} baseOperation := s.createBaseOperation(heartbeatConn) - previousDescription := s.Description() - streamable := previousDescription.TopologyVersion != nil - s.publishServerHeartbeatStartedEvent(s.conn.ID(), s.conn.getCurrentlyStreaming() || streamable) start := time.Now() switch { case s.conn.getCurrentlyStreaming(): diff --git a/x/mongo/driver/topology/server_test.go b/x/mongo/driver/topology/server_test.go index 9850665db1..51af426d18 100644 --- a/x/mongo/driver/topology/server_test.go +++ b/x/mongo/driver/topology/server_test.go @@ -49,6 +49,130 @@ func (cncd *channelNetConnDialer) DialContext(_ context.Context, _, _ string) (n return cnc, nil } +type errorQueue struct { + errors []error + mutex sync.Mutex +} + +func (eq *errorQueue) head() error { + eq.mutex.Lock() + defer eq.mutex.Unlock() + if len(eq.errors) > 0 { + return eq.errors[0] + } + return nil +} + +func (eq *errorQueue) dequeue() bool { + eq.mutex.Lock() + defer eq.mutex.Unlock() + if len(eq.errors) > 0 { + eq.errors = eq.errors[1:] + return true + } + return false +} + +type timeoutConn struct { + net.Conn + errors *errorQueue +} + +func (c *timeoutConn) Read(b []byte) (int, error) { + n, err := 0, c.errors.head() + if err == nil { + n, err = c.Conn.Read(b) + } + return n, err +} + +func (c *timeoutConn) Write(b []byte) (int, error) { + n, err := 0, c.errors.head() + if err == nil { + n, err = c.Conn.Write(b) + } + return n, err +} + +type timeoutDialer struct { + Dialer + errors *errorQueue +} + +func (d *timeoutDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) { + var dialer net.Dialer + c, e := dialer.DialContext(ctx, network, address) + return &timeoutConn{c, d.errors}, e +} + +type timeoutErr struct { + net.UnknownNetworkError +} + +func (e *timeoutErr) Timeout() bool { + return true +} + +var timeout = &timeoutErr{"test timeout"} + +// TestServerHeartbeatTimeout tests timeout retry for GODRIVER-2577. +func TestServerHeartbeatTimeout(t *testing.T) { + testCases := []struct { + desc string + ioErrors []error + expectPoolCleared bool + }{ + { + desc: "one single timeout should not clear the pool", + ioErrors: []error{nil, timeout, nil, timeout, nil}, + expectPoolCleared: false, + }, + { + desc: "continuous timeouts should clear the pool", + ioErrors: []error{nil, timeout, timeout, nil}, + expectPoolCleared: true, + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + var wg sync.WaitGroup + wg.Add(1) + + errors := &errorQueue{errors: tc.ioErrors} + tpm := monitor.NewTestPoolMonitor() + server := NewServer( + address.Address("localhost:27017"), + primitive.NewObjectID(), + WithConnectionPoolMonitor(func(*event.PoolMonitor) *event.PoolMonitor { + return tpm.PoolMonitor + }), + WithConnectionOptions(func(opts ...ConnectionOption) []ConnectionOption { + return append(opts, + WithDialer(func(d Dialer) Dialer { + var dialer net.Dialer + return &timeoutDialer{&dialer, errors} + })) + }), + WithServerMonitor(func(*event.ServerMonitor) *event.ServerMonitor { + return &event.ServerMonitor{ + ServerHeartbeatStarted: func(e *event.ServerHeartbeatStartedEvent) { + if !errors.dequeue() { + wg.Done() + } + }, + } + }), + ) + require.NoError(t, server.Connect(nil)) + wg.Wait() + assert.Equal(t, tc.expectPoolCleared, tpm.IsPoolCleared(), "expected pool cleared to be %v but was %v", tc.expectPoolCleared, tpm.IsPoolCleared()) + }) + } +} + // TestServerConnectionTimeout tests how different timeout errors are handled during connection // creation and server handshake. func TestServerConnectionTimeout(t *testing.T) { From 28c8df9a29fb16aae391f496496bd9357ae640c7 Mon Sep 17 00:00:00 2001 From: Qingyang Hu Date: Fri, 18 Nov 2022 15:32:02 -0500 Subject: [PATCH 2/4] GODRIVER-2577 Fix SSL dialer --- x/mongo/driver/topology/server_test.go | 44 ++++++++++++++++++-------- 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/x/mongo/driver/topology/server_test.go b/x/mongo/driver/topology/server_test.go index 51af426d18..80877f2644 100644 --- a/x/mongo/driver/topology/server_test.go +++ b/x/mongo/driver/topology/server_test.go @@ -11,8 +11,12 @@ package topology import ( "context" + "crypto/tls" + "crypto/x509" "errors" + "io/ioutil" "net" + "os" "runtime" "sync" "sync/atomic" @@ -100,23 +104,34 @@ type timeoutDialer struct { } func (d *timeoutDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) { - var dialer net.Dialer - c, e := dialer.DialContext(ctx, network, address) - return &timeoutConn{c, d.errors}, e -} + c, e := d.Dialer.DialContext(ctx, network, address) -type timeoutErr struct { - net.UnknownNetworkError -} + if caFile := os.Getenv("MONGO_GO_DRIVER_CA_FILE"); len(caFile) > 0 { + pem, err := ioutil.ReadFile(caFile) + if err != nil { + return nil, err + } -func (e *timeoutErr) Timeout() bool { - return true -} + ca := x509.NewCertPool() + if !ca.AppendCertsFromPEM(pem) { + return nil, errors.New("unable to load CA file") + } -var timeout = &timeoutErr{"test timeout"} + config := &tls.Config{ + InsecureSkipVerify: true, + RootCAs: ca, + } + c = tls.Client(c, config) + } + return &timeoutConn{c, d.errors}, e +} // TestServerHeartbeatTimeout tests timeout retry for GODRIVER-2577. func TestServerHeartbeatTimeout(t *testing.T) { + networkTimeoutError := &net.DNSError{ + IsTimeout: true, + } + testCases := []struct { desc string ioErrors []error @@ -124,12 +139,12 @@ func TestServerHeartbeatTimeout(t *testing.T) { }{ { desc: "one single timeout should not clear the pool", - ioErrors: []error{nil, timeout, nil, timeout, nil}, + ioErrors: []error{nil, networkTimeoutError, nil, networkTimeoutError, nil}, expectPoolCleared: false, }, { desc: "continuous timeouts should clear the pool", - ioErrors: []error{nil, timeout, timeout, nil}, + ioErrors: []error{nil, networkTimeoutError, networkTimeoutError, nil}, expectPoolCleared: true, }, } @@ -165,6 +180,9 @@ func TestServerHeartbeatTimeout(t *testing.T) { }, } }), + WithHeartbeatInterval(func(time.Duration) time.Duration { + return 2 * time.Second + }), ) require.NoError(t, server.Connect(nil)) wg.Wait() From 486ba910740db1d9fe811603973a9d0652aaccc4 Mon Sep 17 00:00:00 2001 From: Qingyang Hu Date: Tue, 22 Nov 2022 12:34:20 -0500 Subject: [PATCH 3/4] GODRIVER-2577 Polish after review. --- x/mongo/driver/topology/server.go | 27 ++++++++++++++++---------- x/mongo/driver/topology/server_test.go | 2 +- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/x/mongo/driver/topology/server.go b/x/mongo/driver/topology/server.go index 4c2d0c35c2..6253e5143f 100644 --- a/x/mongo/driver/topology/server.go +++ b/x/mongo/driver/topology/server.go @@ -554,23 +554,30 @@ func (s *Server) update() { defer s.processErrorLock.Unlock() s.updateDescription(desc) - if lastError := desc.LastError; lastError != nil { - // Retry after the first timeout in case of a FAAS pause as described in GODRIVER-2577. - if err := unwrapConnectionError(lastError); err != nil { - err, ok := err.(net.Error) - if ok && err.Timeout() && timeoutCnt < 1 { - timeoutCnt++ - // Continue to next loop. - return true - } + // Retry after the first timeout before clearing the pool in case of a FAAS pause as + // described in GODRIVER-2577. + if err := unwrapConnectionError(desc.LastError); err != nil && timeoutCnt < 1 { + if err == context.Canceled || err == context.DeadlineExceeded { + timeoutCnt++ + // We want to immediately retry on timeout error. Continue to next loop. + return true } + if err, ok := err.(net.Error); ok && err.Timeout() { + timeoutCnt++ + // We want to immediately retry on timeout error. Continue to next loop. + return true + } + } + if err := desc.LastError; err != nil { // Clear the pool once the description has been updated to Unknown. Pass in a nil service ID to clear // because the monitoring routine only runs for non-load balanced deployments in which servers don't return // IDs. s.pool.clear(err, nil) } + // We're either not handling a timeout error, or we just handled the 2nd consecutive + // timeout error. In either case, reset the timeout count to 0 and return false to + // continue the normal check process. timeoutCnt = 0 - // Run forward. return false }(); isShortcut { continue diff --git a/x/mongo/driver/topology/server_test.go b/x/mongo/driver/topology/server_test.go index 80877f2644..ecb001e311 100644 --- a/x/mongo/driver/topology/server_test.go +++ b/x/mongo/driver/topology/server_test.go @@ -181,7 +181,7 @@ func TestServerHeartbeatTimeout(t *testing.T) { } }), WithHeartbeatInterval(func(time.Duration) time.Duration { - return 2 * time.Second + return 200 * time.Millisecond }), ) require.NoError(t, server.Connect(nil)) From 29a14d01e31b6f39f92da66aa430271b76c61aed Mon Sep 17 00:00:00 2001 From: Qingyang Hu Date: Thu, 1 Dec 2022 10:27:31 -0500 Subject: [PATCH 4/4] GODRIVER-2577 Fix the logic of starting a heartbeat. --- x/mongo/driver/topology/server.go | 33 ++++++++++++++++++------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/x/mongo/driver/topology/server.go b/x/mongo/driver/topology/server.go index 6253e5143f..d416f6c195 100644 --- a/x/mongo/driver/topology/server.go +++ b/x/mongo/driver/topology/server.go @@ -732,35 +732,40 @@ func (s *Server) check() (description.Server, error) { var err error var durationNanos int64 - previousDescription := s.Description() - streamable := previousDescription.TopologyVersion != nil - - isNilConn := s.conn == nil - if !isNilConn { - s.publishServerHeartbeatStartedEvent(s.conn.ID(), s.conn.getCurrentlyStreaming() || streamable) - } - // Create a new connection if this is the first check, the connection was closed after an error during the previous - // check, or the previous check was cancelled. - if isNilConn || s.conn.closed() || s.checkWasCancelled() { + start := time.Now() + if s.conn == nil || s.conn.closed() || s.checkWasCancelled() { + // Create a new connection if this is the first check, the connection was closed after an error during the previous + // check, or the previous check was cancelled. + isNilConn := s.conn == nil + if !isNilConn { + s.publishServerHeartbeatStartedEvent(s.conn.ID(), false) + } // Create a new connection and add it's handshake RTT as a sample. err = s.setupHeartbeatConnection() + durationNanos = time.Since(start).Nanoseconds() if err == nil { // Use the description from the connection handshake as the value for this check. s.rttMonitor.addSample(s.conn.helloRTT) descPtr = &s.conn.desc + if !isNilConn { + s.publishServerHeartbeatSucceededEvent(s.conn.ID(), durationNanos, s.conn.desc, false) + } } else { err = unwrapConnectionError(err) + if !isNilConn { + s.publishServerHeartbeatFailedEvent(s.conn.ID(), durationNanos, err, false) + } } - } - - if !isNilConn && err == nil { + } else { // An existing connection is being used. Use the server description properties to execute the right heartbeat. // Wrap conn in a type that implements driver.StreamerConnection. heartbeatConn := initConnection{s.conn} baseOperation := s.createBaseOperation(heartbeatConn) + previousDescription := s.Description() + streamable := previousDescription.TopologyVersion != nil - start := time.Now() + s.publishServerHeartbeatStartedEvent(s.conn.ID(), s.conn.getCurrentlyStreaming() || streamable) switch { case s.conn.getCurrentlyStreaming(): // The connection is already in a streaming state, so we stream the next response.