Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 33 additions & 27 deletions pkg/hostagent/hostagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ type HostAgent struct {
portForwarder *portForwarder // legacy SSH port forwarder
grpcPortForwarder *portfwd.Forwarder

onClose []func() error // LIFO
onClose []func() error // LIFO
onCloseMu sync.Mutex

driver driver.Driver
signalCh chan os.Signal
Expand Down Expand Up @@ -462,26 +463,19 @@ func (a *HostAgent) startRoutinesAndWait(ctx context.Context, errCh <-chan error
stRunning.Running = true
a.emitEvent(ctx, events.Event{Status: stRunning})
}()
for {
select {
case driverErr := <-errCh:
logrus.Infof("Driver stopped due to error: %q", driverErr)
cancelHA()
if closeErr := a.close(); closeErr != nil {
logrus.WithError(closeErr).Warn("an error during shutting down the host agent")
}
err := a.driver.Stop(ctx)
return err
case sig := <-a.signalCh:
logrus.Infof("Received %s, shutting down the host agent", osutil.SignalName(sig))
cancelHA()
if closeErr := a.close(); closeErr != nil {
logrus.WithError(closeErr).Warn("an error during shutting down the host agent")
}
err := a.driver.Stop(ctx)
return err
}
}
// wait for either the driver to stop or a signal to shut down
select {
case driverErr := <-errCh:
logrus.Infof("Driver stopped due to error: %q", driverErr)
case sig := <-a.signalCh:
logrus.Infof("Received %s, shutting down the host agent", osutil.SignalName(sig))
}
// close the host agent routines before cancelling the context
if closeErr := a.close(); closeErr != nil {
logrus.WithError(closeErr).Warn("an error during shutting down the host agent")
}
cancelHA()
return a.driver.Stop(ctx)
}

func (a *HostAgent) Info(_ context.Context) (*hostagentapi.Info, error) {
Expand All @@ -502,7 +496,7 @@ func (a *HostAgent) startHostAgentRoutines(ctx context.Context) error {
}
logrus.Info(msg)
}
a.onClose = append(a.onClose, func() error {
a.cleanUp(func() error {
logrus.Debugf("shutting down the SSH master")
if exitMasterErr := ssh.ExitMaster(a.instSSHAddress, a.sshLocalPort, a.sshConfig); exitMasterErr != nil {
logrus.WithError(exitMasterErr).Warn("failed to exit SSH master")
Expand Down Expand Up @@ -531,7 +525,7 @@ sudo chown -R "${USER}" /run/host-services`
if err != nil {
errs = append(errs, err)
}
a.onClose = append(a.onClose, func() error {
a.cleanUp(func() error {
var unmountErrs []error
for _, m := range mounts {
if unmountErr := m.close(); unmountErr != nil {
Expand All @@ -542,7 +536,7 @@ sudo chown -R "${USER}" /run/host-services`
})
}
if len(a.instConfig.AdditionalDisks) > 0 {
a.onClose = append(a.onClose, func() error {
a.cleanUp(func() error {
var unlockErrs []error
for _, d := range a.instConfig.AdditionalDisks {
disk, inspectErr := store.InspectDisk(d.Name)
Expand Down Expand Up @@ -601,7 +595,7 @@ sudo chown -R "${USER}" /run/host-services`
errs = append(errs, err)
}
}
a.onClose = append(a.onClose, func() error {
a.cleanUp(func() error {
var rmErrs []error
for _, rule := range a.instConfig.CopyToHost {
if rule.DeleteOnStop {
Expand All @@ -616,7 +610,17 @@ sudo chown -R "${USER}" /run/host-services`
return errors.Join(errs...)
}

// cleanUp registers a cleanup function to be called when the host agent is stopped.
// The cleanup functions are called before the context is cancelled, in the reverse order of their registration.
func (a *HostAgent) cleanUp(fn func() error) {
a.onCloseMu.Lock()
defer a.onCloseMu.Unlock()
a.onClose = append(a.onClose, fn)
}

func (a *HostAgent) close() error {
a.onCloseMu.Lock()
defer a.onCloseMu.Unlock()
logrus.Infof("Shutting down the host agent")
var errs []error
for i := len(a.onClose) - 1; i >= 0; i-- {
Expand Down Expand Up @@ -645,7 +649,7 @@ func (a *HostAgent) watchGuestAgentEvents(ctx context.Context) {
localUnix := filepath.Join(a.instDir, filenames.GuestAgentSock)
remoteUnix := "/run/lima-guestagent.sock"

a.onClose = append(a.onClose, func() error {
a.cleanUp(func() error {
logrus.Debugf("Stop forwarding unix sockets")
var errs []error
for _, rule := range a.instConfig.PortForwards {
Expand Down Expand Up @@ -679,6 +683,9 @@ func (a *HostAgent) watchGuestAgentEvents(ctx context.Context) {
}
}()

// ensure close before ctx is cancelled
a.cleanUp(a.grpcPortForwarder.Close)

for {
if a.client == nil || !isGuestAgentSocketAccessible(ctx, a.client) {
if a.driver.ForwardGuestAgent() {
Expand Down Expand Up @@ -811,7 +818,6 @@ func (a *HostAgent) processGuestAgentEvents(ctx context.Context, client *guestag
a.grpcPortForwarder.OnEvent(ctx, client, ev)
}
}
defer a.grpcPortForwarder.Close()

if err := client.Events(ctx, onEvent); err != nil {
if status.Code(err) == codes.Canceled {
Expand Down
4 changes: 2 additions & 2 deletions pkg/portfwd/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func NewPortForwarder(rules []limatype.PortForward, ignoreTCP, ignoreUDP bool) *
}
}

func (fw *Forwarder) Close() {
fw.closableListeners.Close()
func (fw *Forwarder) Close() error {
return fw.closableListeners.Close()
}

func (fw *Forwarder) OnEvent(ctx context.Context, client *guestagentclient.GuestAgentClient, ev *api.Event) {
Expand Down
12 changes: 9 additions & 3 deletions pkg/portfwd/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,25 @@ func NewClosableListener() *ClosableListeners {
}
}

func (p *ClosableListeners) Close() {
func (p *ClosableListeners) Close() error {
p.listenersRW.Lock()
defer p.listenersRW.Unlock()
var errs []error
for _, listener := range p.listeners {
listener.Close()
if err := listener.Close(); err != nil {
errs = append(errs, err)
}
}
clear(p.listeners)
p.udpListenersRW.Lock()
defer p.udpListenersRW.Unlock()
for _, listener := range p.udpListeners {
listener.Close()
if err := listener.Close(); err != nil {
errs = append(errs, err)
}
}
clear(p.udpListeners)
return errors.Join(errs...)
}

func (p *ClosableListeners) Forward(ctx context.Context, client *guestagentclient.GuestAgentClient,
Expand Down
Loading