diff --git a/.codespellrc b/.codespellrc
index 0e785ac5c..e83793750 100644
--- a/.codespellrc
+++ b/.codespellrc
@@ -1,3 +1,3 @@
[codespell]
-ignore-words-list = NotIn,notin,AfterAll,ND,aks
+ignore-words-list = NotIn,notin,AfterAll,ND,aks,deriver
skip = *.svg,*.mod,*.sum
diff --git a/cmd/thv-operator/api/v1alpha1/mcpregistry_types.go b/cmd/thv-operator/api/v1alpha1/mcpregistry_types.go
index 5147b7e70..40478afc1 100644
--- a/cmd/thv-operator/api/v1alpha1/mcpregistry_types.go
+++ b/cmd/thv-operator/api/v1alpha1/mcpregistry_types.go
@@ -205,7 +205,7 @@ type MCPRegistryStatus struct {
// SyncStatus provides detailed information about data synchronization
type SyncStatus struct {
// Phase represents the current synchronization phase
- // +kubebuilder:validation:Enum=Idle;Syncing;Complete;Failed
+ // +kubebuilder:validation:Enum=Syncing;Complete;Failed
Phase SyncPhase `json:"phase"`
// Message provides additional information about the sync status
@@ -256,13 +256,10 @@ type APIStatus struct {
}
// SyncPhase represents the data synchronization state
-// +kubebuilder:validation:Enum=Idle;Syncing;Complete;Failed
+// +kubebuilder:validation:Enum=Syncing;Complete;Failed
type SyncPhase string
const (
- // SyncPhaseIdle means no sync is needed or scheduled
- SyncPhaseIdle SyncPhase = "Idle"
-
// SyncPhaseSyncing means sync is currently in progress
SyncPhaseSyncing SyncPhase = "Syncing"
@@ -389,7 +386,7 @@ func (r *MCPRegistry) DeriveOverallPhase() MCPRegistryPhase {
apiStatus := r.Status.APIStatus
// Default phases if status not set
- syncPhase := SyncPhaseIdle
+ var syncPhase SyncPhase
if syncStatus != nil {
syncPhase = syncStatus.Phase
}
@@ -409,8 +406,8 @@ func (r *MCPRegistry) DeriveOverallPhase() MCPRegistryPhase {
return MCPRegistryPhaseSyncing
}
- // If sync is complete or idle (no sync needed), check API status
- if syncPhase == SyncPhaseComplete || syncPhase == SyncPhaseIdle {
+ // If sync is complete (no sync needed), check API status
+ if syncPhase == SyncPhaseComplete {
switch apiPhase {
case APIPhaseReady:
return MCPRegistryPhaseReady
diff --git a/cmd/thv-operator/api/v1alpha1/mcpregistry_types_test.go b/cmd/thv-operator/api/v1alpha1/mcpregistry_types_test.go
index 6b1ce90ab..9f6d6e626 100644
--- a/cmd/thv-operator/api/v1alpha1/mcpregistry_types_test.go
+++ b/cmd/thv-operator/api/v1alpha1/mcpregistry_types_test.go
@@ -131,63 +131,6 @@ func TestMCPRegistry_DeriveOverallPhase(t *testing.T) {
description: "Sync complete + API unhealthy should result in Pending",
},
- // Sync Idle + API combinations (key test cases for the recent fix)
- {
- name: "sync_idle_api_ready",
- syncStatus: &SyncStatus{
- Phase: SyncPhaseIdle,
- },
- apiStatus: &APIStatus{
- Phase: APIPhaseReady,
- },
- expectedPhase: MCPRegistryPhaseReady,
- description: "Sync idle + API ready should result in Ready (fixed behavior)",
- },
- {
- name: "sync_idle_api_error",
- syncStatus: &SyncStatus{
- Phase: SyncPhaseIdle,
- },
- apiStatus: &APIStatus{
- Phase: APIPhaseError,
- },
- expectedPhase: MCPRegistryPhaseFailed,
- description: "Sync idle + API error should result in Failed",
- },
- {
- name: "sync_idle_api_notstarted",
- syncStatus: &SyncStatus{
- Phase: SyncPhaseIdle,
- },
- apiStatus: &APIStatus{
- Phase: APIPhaseNotStarted,
- },
- expectedPhase: MCPRegistryPhasePending,
- description: "Sync idle + API not started should result in Pending",
- },
- {
- name: "sync_idle_api_deploying",
- syncStatus: &SyncStatus{
- Phase: SyncPhaseIdle,
- },
- apiStatus: &APIStatus{
- Phase: APIPhaseDeploying,
- },
- expectedPhase: MCPRegistryPhasePending,
- description: "Sync idle + API deploying should result in Pending",
- },
- {
- name: "sync_idle_api_unhealthy",
- syncStatus: &SyncStatus{
- Phase: SyncPhaseIdle,
- },
- apiStatus: &APIStatus{
- Phase: APIPhaseUnhealthy,
- },
- expectedPhase: MCPRegistryPhasePending,
- description: "Sync idle + API unhealthy should result in Pending",
- },
-
// Partial status combinations (one nil, one set)
{
name: "sync_complete_api_nil",
@@ -200,19 +143,8 @@ func TestMCPRegistry_DeriveOverallPhase(t *testing.T) {
name: "sync_nil_api_ready",
syncStatus: nil,
apiStatus: &APIStatus{Phase: APIPhaseReady},
- expectedPhase: MCPRegistryPhaseReady,
- description: "Sync nil + API ready should result in Ready (sync defaults to Idle, which is treated as valid)",
- },
-
- // Edge case: sync idle with nil API (common in real scenarios)
- {
- name: "sync_idle_api_nil",
- syncStatus: &SyncStatus{
- Phase: SyncPhaseIdle,
- },
- apiStatus: nil,
expectedPhase: MCPRegistryPhasePending,
- description: "Sync idle + API nil should result in Pending (API defaults to NotStarted)",
+ description: "Sync nil + API ready should result in Pending",
},
}
@@ -247,7 +179,7 @@ func TestMCPRegistry_DeriveOverallPhase(t *testing.T) {
// Helper function to get sync phase as string for better test output
func getSyncPhaseString(syncStatus *SyncStatus) string {
if syncStatus == nil {
- return "nil (defaults to Idle)"
+ return "nil"
}
return string(syncStatus.Phase)
}
@@ -264,20 +196,20 @@ func getAPIPhaseString(apiStatus *APIStatus) string {
func TestMCPRegistry_DeriveOverallPhase_EdgeCases(t *testing.T) {
t.Parallel()
- t.Run("regression_test_idle_ready_becomes_ready", func(t *testing.T) {
+ t.Run("regression_test_complete_ready_becomes_ready", func(t *testing.T) {
t.Parallel()
// This is the specific regression test for the bug fix where
- // syncPhase=Idle + apiPhase=Ready was incorrectly returning Pending
+ // syncPhase=Complete + apiPhase=Ready was incorrectly returning Pending
registry := &MCPRegistry{
Status: MCPRegistryStatus{
- SyncStatus: &SyncStatus{Phase: SyncPhaseIdle},
+ SyncStatus: &SyncStatus{Phase: SyncPhaseComplete},
APIStatus: &APIStatus{Phase: APIPhaseReady},
},
}
phase := registry.DeriveOverallPhase()
assert.Equal(t, MCPRegistryPhaseReady, phase,
- "The specific case syncPhase=Idle + apiPhase=Ready should result in Ready phase")
+ "The specific case syncPhase=Complete + apiPhase=Ready should result in Ready phase")
})
t.Run("all_api_phases_with_failed_sync", func(t *testing.T) {
diff --git a/cmd/thv-operator/controllers/mcpregistry_controller.go b/cmd/thv-operator/controllers/mcpregistry_controller.go
index 0aad6c965..92e5091aa 100644
--- a/cmd/thv-operator/controllers/mcpregistry_controller.go
+++ b/cmd/thv-operator/controllers/mcpregistry_controller.go
@@ -7,7 +7,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/errors"
+ kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
@@ -22,6 +22,19 @@ import (
"github.com/stacklok/toolhive/cmd/thv-operator/pkg/sync"
)
+// Default timing constants for the controller
+const (
+ // DefaultControllerRetryAfterConstant is the constant default retry interval for controller operations that fail
+ DefaultControllerRetryAfterConstant = time.Minute * 5
+)
+
+// Configurable timing variables for testing
+var (
+ // DefaultControllerRetryAfter is the configurable default retry interval for controller operations that fail
+ // This can be modified in tests to speed up retry behavior
+ DefaultControllerRetryAfter = DefaultControllerRetryAfterConstant
+)
+
// MCPRegistryReconciler reconciles a MCPRegistry object
type MCPRegistryReconciler struct {
client.Client
@@ -81,7 +94,7 @@ func (r *MCPRegistryReconciler) Reconcile(ctx context.Context, req ctrl.Request)
mcpRegistry := &mcpv1alpha1.MCPRegistry{}
err := r.Get(ctx, req.NamespacedName, mcpRegistry)
if err != nil {
- if errors.IsNotFound(err) {
+ if kerrors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
// Return and don't requeue
ctxLogger.Info("MCPRegistry resource not found. Ignoring since object must be deleted")
@@ -148,17 +161,37 @@ func (r *MCPRegistryReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}
- // 3. Create status collector for batched updates
- statusCollector := mcpregistrystatus.NewCollector(mcpRegistry)
+ // 3. Create status manager for batched updates with separation of concerns
+ statusManager := mcpregistrystatus.NewStatusManager(mcpRegistry)
// 4. Reconcile sync operation
- result, syncErr := r.reconcileSync(ctx, mcpRegistry, statusCollector)
+ result, syncErr := r.reconcileSync(ctx, mcpRegistry, statusManager)
// 5. Reconcile API service (deployment and service, independent of sync status)
if syncErr == nil {
- if apiErr := r.registryAPIManager.ReconcileAPIService(ctx, mcpRegistry, statusCollector); apiErr != nil {
+ if apiErr := r.registryAPIManager.ReconcileAPIService(ctx, mcpRegistry); apiErr != nil {
ctxLogger.Error(apiErr, "Failed to reconcile API service")
+ // Set API status with detailed error message from structured error
+ statusManager.API().SetAPIStatus(mcpv1alpha1.APIPhaseError, apiErr.Message, "")
+ statusManager.API().SetAPIReadyCondition(apiErr.ConditionReason, apiErr.Message, metav1.ConditionFalse)
err = apiErr
+ } else {
+ // API reconciliation successful - check readiness and set appropriate status
+ isReady := r.registryAPIManager.IsAPIReady(ctx, mcpRegistry)
+ if isReady {
+ // In-cluster endpoint (simplified form works for internal access)
+ endpoint := fmt.Sprintf("http://%s.%s:8080",
+ mcpRegistry.GetAPIResourceName(), mcpRegistry.Namespace)
+ statusManager.API().SetAPIStatus(mcpv1alpha1.APIPhaseReady,
+ "Registry API is ready and serving requests", endpoint)
+ statusManager.API().SetAPIReadyCondition("APIReady",
+ "Registry API is ready and serving requests", metav1.ConditionTrue)
+ } else {
+ statusManager.API().SetAPIStatus(mcpv1alpha1.APIPhaseDeploying,
+ "Registry API deployment is not ready yet", "")
+ statusManager.API().SetAPIReadyCondition("APINotReady",
+ "Registry API deployment is not ready yet", metav1.ConditionFalse)
+ }
}
}
@@ -171,10 +204,11 @@ func (r *MCPRegistryReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
// 7. Derive overall phase and message from sync and API status
- r.deriveOverallStatus(ctx, mcpRegistry, statusCollector)
+ statusDeriver := mcpregistrystatus.NewDefaultStatusDeriver()
+ r.deriveOverallStatus(ctx, mcpRegistry, statusManager, statusDeriver)
// 8. Apply all status changes in a single batch update
- if statusUpdateErr := statusCollector.Apply(ctx, r.Client); statusUpdateErr != nil {
+ if statusUpdateErr := statusManager.Apply(ctx, r.Client); statusUpdateErr != nil {
ctxLogger.Error(statusUpdateErr, "Failed to apply batched status update")
// Return the status update error only if there was no main reconciliation error
if syncErr == nil {
@@ -189,7 +223,7 @@ func (r *MCPRegistryReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// Log reconciliation completion
if err != nil {
ctxLogger.Error(err, "Reconciliation completed with error",
- "MCPRegistry.Name", mcpRegistry.Name)
+ "MCPRegistry.Name", mcpRegistry.Name, "requeueAfter", result.RequeueAfter)
} else {
var syncPhase, apiPhase string
if mcpRegistry.Status.SyncStatus != nil {
@@ -227,36 +261,16 @@ func (*MCPRegistryReconciler) preserveExistingSyncData(mcpRegistry *mcpv1alpha1.
//
//nolint:gocyclo // Complex reconciliation logic requires multiple conditions
func (r *MCPRegistryReconciler) reconcileSync(
- ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry, statusCollector mcpregistrystatus.Collector,
+ ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry, statusManager mcpregistrystatus.StatusManager,
) (ctrl.Result, error) {
ctxLogger := log.FromContext(ctx)
// Check if sync is needed - no need to refresh object here since we just fetched it
- syncNeeded, syncReason, nextSyncTime, err := r.syncManager.ShouldSync(ctx, mcpRegistry)
- if err != nil {
- ctxLogger.Error(err, "Failed to determine if sync is needed")
- // Proceed with sync on error to be safe
- syncNeeded = true
- syncReason = sync.ReasonErrorCheckingSyncNeed
- }
+ syncNeeded, syncReason, nextSyncTime := r.syncManager.ShouldSync(ctx, mcpRegistry)
if !syncNeeded {
ctxLogger.Info("Sync not needed", "reason", syncReason)
-
- // Only update sync status if it's not already Idle with the right message
- currentSyncPhase := mcpv1alpha1.SyncPhaseIdle // default
- currentMessage := ""
- if mcpRegistry.Status.SyncStatus != nil {
- currentSyncPhase = mcpRegistry.Status.SyncStatus.Phase
- currentMessage = mcpRegistry.Status.SyncStatus.Message
- }
-
- // Only set sync status if it needs to change
- if currentSyncPhase != mcpv1alpha1.SyncPhaseIdle || currentMessage != "No sync required" {
- // Preserve existing sync data when no sync is needed
- lastSyncTime, lastSyncHash, serverCount := r.preserveExistingSyncData(mcpRegistry)
- statusCollector.SetSyncStatus(mcpv1alpha1.SyncPhaseIdle, "No sync required", 0, lastSyncTime, lastSyncHash, serverCount)
- }
+ // Do not update sync status if sync is not needed: this would cause unnecessary status updates
// Schedule next reconciliation if we have a sync policy
if nextSyncTime != nil {
@@ -273,7 +287,7 @@ func (r *MCPRegistryReconciler) reconcileSync(
if syncReason == sync.ReasonManualNoChanges {
// Preserve existing sync data for manual sync with no changes
lastSyncTime, lastSyncHash, serverCount := r.preserveExistingSyncData(mcpRegistry)
- statusCollector.SetSyncStatus(
+ statusManager.Sync().SetSyncStatus(
mcpv1alpha1.SyncPhaseComplete, "Manual sync completed (no data changes)", 0,
lastSyncTime, lastSyncHash, serverCount)
return r.syncManager.UpdateManualSyncTriggerOnly(ctx, mcpRegistry)
@@ -281,32 +295,43 @@ func (r *MCPRegistryReconciler) reconcileSync(
// Set sync status to syncing before starting the operation
// Clear sync data when starting sync operation
- statusCollector.SetSyncStatus(
+ statusManager.Sync().SetSyncStatus(
mcpv1alpha1.SyncPhaseSyncing, "Synchronizing registry data",
getCurrentAttemptCount(mcpRegistry)+1, nil, "", 0)
// Perform the sync - the sync manager will handle core registry field updates
- result, syncResult, err := r.syncManager.PerformSync(ctx, mcpRegistry)
+ result, syncResult, syncErr := r.syncManager.PerformSync(ctx, mcpRegistry)
- if err != nil {
+ if syncErr != nil {
// Sync failed - set sync status to failed
- ctxLogger.Error(err, "Sync failed, scheduling retry")
+ ctxLogger.Error(syncErr, "Sync failed, scheduling retry")
// Preserve existing sync data when sync fails
lastSyncTime, lastSyncHash, serverCount := r.preserveExistingSyncData(mcpRegistry)
- statusCollector.SetSyncStatus(mcpv1alpha1.SyncPhaseFailed,
- fmt.Sprintf("Sync failed: %v", err), getCurrentAttemptCount(mcpRegistry)+1, lastSyncTime, lastSyncHash, serverCount)
+
+ // Set sync status with detailed error message from SyncError
+ statusManager.Sync().SetSyncStatus(mcpv1alpha1.SyncPhaseFailed,
+ syncErr.Message, getCurrentAttemptCount(mcpRegistry)+1, lastSyncTime, lastSyncHash, serverCount)
+ // Set the appropriate condition based on the error type
+ statusManager.Sync().SetSyncCondition(metav1.Condition{
+ Type: syncErr.ConditionType,
+ Status: metav1.ConditionFalse,
+ Reason: syncErr.ConditionReason,
+ Message: syncErr.Message,
+ LastTransitionTime: metav1.Now(),
+ })
+
// Use a shorter retry interval instead of the full sync interval
- retryAfter := time.Minute * 5 // Default retry interval
+ retryAfter := DefaultControllerRetryAfter // Default retry interval
if result.RequeueAfter > 0 {
// If PerformSync already set a retry interval, use it
retryAfter = result.RequeueAfter
}
- return ctrl.Result{RequeueAfter: retryAfter}, err
+ return ctrl.Result{RequeueAfter: retryAfter}, syncErr
}
// Sync successful - set sync status to complete using data from sync result
now := metav1.Now()
- statusCollector.SetSyncStatus(mcpv1alpha1.SyncPhaseComplete, "Registry data synchronized successfully", 0,
+ statusManager.Sync().SetSyncStatus(mcpv1alpha1.SyncPhaseComplete, "Registry data synchronized successfully", 0,
&now, syncResult.Hash, syncResult.ServerCount)
ctxLogger.Info("Registry data sync completed successfully")
@@ -326,7 +351,7 @@ func (r *MCPRegistryReconciler) reconcileSync(
ctxLogger.Info("Sync successful, no automatic sync policy configured")
}
- return result, err
+ return result, nil
}
// finalizeMCPRegistry performs the finalizer logic for the MCPRegistry
@@ -356,61 +381,25 @@ func (r *MCPRegistryReconciler) finalizeMCPRegistry(ctx context.Context, registr
}
// deriveOverallStatus determines the overall MCPRegistry phase and message based on sync and API status
-func (r *MCPRegistryReconciler) deriveOverallStatus(
- ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry, statusCollector mcpregistrystatus.Collector) {
+func (*MCPRegistryReconciler) deriveOverallStatus(
+ ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry,
+ statusManager mcpregistrystatus.StatusManager, statusDeriver mcpregistrystatus.StatusDeriver) {
ctxLogger := log.FromContext(ctx)
- // Create a temporary copy with current collected status to derive phase
- tempRegistry := mcpRegistry.DeepCopy()
-
- // Apply the collected status changes to temp registry for phase calculation
- // Note: This is a simulation - we can't actually access the collected values directly
- // Instead, we'll use the DeriveOverallPhase method which works with current status
- // The controller will need to be smart about when sync/API status get updated
-
- // For now, let's derive phase based on current MCPRegistry status since
- // the status collector changes haven't been applied yet
- derivedPhase := tempRegistry.DeriveOverallPhase()
- derivedMessage := r.deriveMessage(derivedPhase, tempRegistry)
+ // Use the StatusDeriver to determine the overall phase and message
+ // based on current sync and API statuses
+ derivedPhase, derivedMessage := statusDeriver.DeriveOverallStatus(
+ mcpRegistry.Status.SyncStatus,
+ mcpRegistry.Status.APIStatus,
+ )
// Only update phase and message if they've changed
- if mcpRegistry.Status.Phase != derivedPhase {
- statusCollector.SetPhase(derivedPhase)
- ctxLogger.Info("Updated overall phase", "oldPhase", mcpRegistry.Status.Phase, "newPhase", derivedPhase)
- }
-
- if mcpRegistry.Status.Message != derivedMessage {
- statusCollector.SetMessage(derivedMessage)
- ctxLogger.Info("Updated overall message", "message", derivedMessage)
- }
-}
-
-// deriveMessage creates an appropriate message based on the overall phase and registry state
-func (*MCPRegistryReconciler) deriveMessage(phase mcpv1alpha1.MCPRegistryPhase, mcpRegistry *mcpv1alpha1.MCPRegistry) string {
- switch phase {
- case mcpv1alpha1.MCPRegistryPhasePending:
- if mcpRegistry.Status.SyncStatus != nil && mcpRegistry.Status.SyncStatus.Phase == mcpv1alpha1.SyncPhaseComplete {
- return "Registry data synced, API deployment in progress"
- }
- return "Registry initialization in progress"
- case mcpv1alpha1.MCPRegistryPhaseReady:
- return "Registry is ready and API is serving requests"
- case mcpv1alpha1.MCPRegistryPhaseFailed:
- // Return more specific error message if available
- if mcpRegistry.Status.SyncStatus != nil && mcpRegistry.Status.SyncStatus.Phase == mcpv1alpha1.SyncPhaseFailed {
- return fmt.Sprintf("Sync failed: %s", mcpRegistry.Status.SyncStatus.Message)
- }
- if mcpRegistry.Status.APIStatus != nil && mcpRegistry.Status.APIStatus.Phase == mcpv1alpha1.APIPhaseError {
- return fmt.Sprintf("API deployment failed: %s", mcpRegistry.Status.APIStatus.Message)
- }
- return "Registry operation failed"
- case mcpv1alpha1.MCPRegistryPhaseSyncing:
- return "Registry data synchronization in progress"
- case mcpv1alpha1.MCPRegistryPhaseTerminating:
- return "Registry is being terminated"
- default:
- return "Registry status unknown"
- }
+ statusManager.SetOverallStatus(derivedPhase, derivedMessage)
+ ctxLogger.Info("Updated overall status",
+ "oldPhase", mcpRegistry.Status.Phase,
+ "newPhase", derivedPhase,
+ "oldMessage", mcpRegistry.Status.Message,
+ "newMessage", derivedMessage)
}
// SetupWithManager sets up the controller with the Manager.
diff --git a/cmd/thv-operator/pkg/mcpregistrystatus/collector.go b/cmd/thv-operator/pkg/mcpregistrystatus/collector.go
index 7bd607619..7cdb443bf 100644
--- a/cmd/thv-operator/pkg/mcpregistrystatus/collector.go
+++ b/cmd/thv-operator/pkg/mcpregistrystatus/collector.go
@@ -15,7 +15,7 @@ import (
// StatusCollector collects status changes during reconciliation
// and applies them in a single batch update at the end.
-// It implements the Collector interface.
+// It implements the StatusManager interface.
type StatusCollector struct {
mcpRegistry *mcpv1alpha1.MCPRegistry
hasChanges bool
@@ -24,14 +24,36 @@ type StatusCollector struct {
syncStatus *mcpv1alpha1.SyncStatus
apiStatus *mcpv1alpha1.APIStatus
conditions map[string]metav1.Condition
+
+ // Component collectors
+ syncCollector *syncStatusCollector
+ apiCollector *apiStatusCollector
+}
+
+// syncStatusCollector implements SyncStatusCollector
+type syncStatusCollector struct {
+ parent *StatusCollector
+}
+
+// apiStatusCollector implements APIStatusCollector
+type apiStatusCollector struct {
+ parent *StatusCollector
+}
+
+// NewStatusManager creates a new StatusManager for the given MCPRegistry resource.
+func NewStatusManager(mcpRegistry *mcpv1alpha1.MCPRegistry) StatusManager {
+ return newStatusCollector(mcpRegistry)
}
-// NewCollector creates a new status update collector for the given MCPRegistry resource.
-func NewCollector(mcpRegistry *mcpv1alpha1.MCPRegistry) Collector {
- return &StatusCollector{
+// newStatusCollector creates the internal StatusCollector implementation
+func newStatusCollector(mcpRegistry *mcpv1alpha1.MCPRegistry) *StatusCollector {
+ collector := &StatusCollector{
mcpRegistry: mcpRegistry,
conditions: make(map[string]metav1.Condition),
}
+ collector.syncCollector = &syncStatusCollector{parent: collector}
+ collector.apiCollector = &apiStatusCollector{parent: collector}
+ return collector
}
// SetPhase sets the phase to be updated.
@@ -46,10 +68,10 @@ func (s *StatusCollector) SetMessage(message string) {
s.hasChanges = true
}
-// SetAPIReadyCondition adds or updates the API ready condition.
-func (s *StatusCollector) SetAPIReadyCondition(reason, message string, status metav1.ConditionStatus) {
- s.conditions[mcpv1alpha1.ConditionAPIReady] = metav1.Condition{
- Type: mcpv1alpha1.ConditionAPIReady,
+// SetCondition sets a general condition with the specified type, reason, message, and status
+func (s *StatusCollector) SetCondition(conditionType, reason, message string, status metav1.ConditionStatus) {
+ s.conditions[conditionType] = metav1.Condition{
+ Type: conditionType,
Status: status,
Reason: reason,
Message: message,
@@ -57,6 +79,11 @@ func (s *StatusCollector) SetAPIReadyCondition(reason, message string, status me
s.hasChanges = true
}
+// SetAPIReadyCondition adds or updates the API ready condition.
+func (s *StatusCollector) SetAPIReadyCondition(reason, message string, status metav1.ConditionStatus) {
+ s.SetCondition(mcpv1alpha1.ConditionAPIReady, reason, message, status)
+}
+
// SetSyncStatus sets the detailed sync status.
func (s *StatusCollector) SetSyncStatus(
phase mcpv1alpha1.SyncPhase, message string, attemptCount int,
@@ -148,3 +175,47 @@ func (s *StatusCollector) Apply(ctx context.Context, k8sClient client.Client) er
return nil
}
+
+// StatusManager interface methods
+
+// Sync returns the sync status collector
+func (s *StatusCollector) Sync() SyncStatusCollector {
+ return s.syncCollector
+}
+
+// API returns the API status collector
+func (s *StatusCollector) API() APIStatusCollector {
+ return s.apiCollector
+}
+
+// SetOverallStatus sets the overall phase and message explicitly (for special cases)
+func (s *StatusCollector) SetOverallStatus(phase mcpv1alpha1.MCPRegistryPhase, message string) {
+ s.SetPhase(phase)
+ s.SetMessage(message)
+}
+
+// SyncStatusCollector implementation
+
+// SetSyncCondition sets a sync-related condition
+func (sc *syncStatusCollector) SetSyncCondition(condition metav1.Condition) {
+ sc.parent.conditions[condition.Type] = condition
+ sc.parent.hasChanges = true
+}
+
+// SetSyncStatus delegates to the parent's SetSyncStatus method
+func (sc *syncStatusCollector) SetSyncStatus(phase mcpv1alpha1.SyncPhase, message string, attemptCount int,
+ lastSyncTime *metav1.Time, lastSyncHash string, serverCount int) {
+ sc.parent.SetSyncStatus(phase, message, attemptCount, lastSyncTime, lastSyncHash, serverCount)
+}
+
+// APIStatusCollector implementation
+
+// SetAPIStatus delegates to the parent's SetAPIStatus method
+func (ac *apiStatusCollector) SetAPIStatus(phase mcpv1alpha1.APIPhase, message string, endpoint string) {
+ ac.parent.SetAPIStatus(phase, message, endpoint)
+}
+
+// SetAPIReadyCondition delegates to the parent's SetAPIReadyCondition method
+func (ac *apiStatusCollector) SetAPIReadyCondition(reason, message string, status metav1.ConditionStatus) {
+ ac.parent.SetAPIReadyCondition(reason, message, status)
+}
diff --git a/cmd/thv-operator/pkg/mcpregistrystatus/collector_test.go b/cmd/thv-operator/pkg/mcpregistrystatus/collector_test.go
index 872c9dc80..86e778757 100644
--- a/cmd/thv-operator/pkg/mcpregistrystatus/collector_test.go
+++ b/cmd/thv-operator/pkg/mcpregistrystatus/collector_test.go
@@ -13,7 +13,7 @@ import (
mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1"
)
-func TestNewCollector(t *testing.T) {
+func TestNewStatusManager(t *testing.T) {
t.Parallel()
registry := &mcpv1alpha1.MCPRegistry{
@@ -23,10 +23,10 @@ func TestNewCollector(t *testing.T) {
},
}
- collector := NewCollector(registry)
+ statusManager := NewStatusManager(registry)
- assert.NotNil(t, collector)
- sc := collector.(*StatusCollector)
+ assert.NotNil(t, statusManager)
+ sc := statusManager.(*StatusCollector)
assert.Equal(t, registry, sc.mcpRegistry)
assert.False(t, sc.hasChanges)
assert.Empty(t, sc.conditions)
@@ -58,7 +58,7 @@ func TestStatusCollector_SetPhase(t *testing.T) {
t.Parallel()
registry := &mcpv1alpha1.MCPRegistry{}
- collector := NewCollector(registry).(*StatusCollector)
+ collector := NewStatusManager(registry).(*StatusCollector)
collector.SetPhase(tt.phase)
@@ -73,7 +73,7 @@ func TestStatusCollector_SetMessage(t *testing.T) {
t.Parallel()
registry := &mcpv1alpha1.MCPRegistry{}
- collector := NewCollector(registry).(*StatusCollector)
+ collector := NewStatusManager(registry).(*StatusCollector)
testMessage := "Test message"
collector.SetMessage(testMessage)
@@ -114,7 +114,7 @@ func TestStatusCollector_SetAPIReadyCondition(t *testing.T) {
t.Parallel()
registry := &mcpv1alpha1.MCPRegistry{}
- collector := NewCollector(registry).(*StatusCollector)
+ collector := NewStatusManager(registry).(*StatusCollector)
collector.SetAPIReadyCondition(tt.reason, tt.message, tt.status)
@@ -143,8 +143,8 @@ func TestStatusCollector_SetSyncStatus(t *testing.T) {
serverCount int
}{
{
- name: "sync status idle",
- phase: mcpv1alpha1.SyncPhaseIdle,
+ name: "sync status complete",
+ phase: mcpv1alpha1.SyncPhaseComplete,
message: "No sync required",
attemptCount: 0,
lastSyncTime: &metav1.Time{Time: metav1.Now().Time},
@@ -176,7 +176,7 @@ func TestStatusCollector_SetSyncStatus(t *testing.T) {
t.Parallel()
registry := &mcpv1alpha1.MCPRegistry{}
- collector := NewCollector(registry).(*StatusCollector)
+ collector := NewStatusManager(registry).(*StatusCollector)
collector.SetSyncStatus(tt.phase, tt.message, tt.attemptCount, tt.lastSyncTime, tt.lastSyncHash, tt.serverCount)
@@ -230,7 +230,7 @@ func TestStatusCollector_SetAPIStatus(t *testing.T) {
t.Parallel()
registry := &mcpv1alpha1.MCPRegistry{}
- collector := NewCollector(registry).(*StatusCollector)
+ collector := NewStatusManager(registry).(*StatusCollector)
collector.SetAPIStatus(tt.phase, tt.message, tt.endpoint)
@@ -257,7 +257,7 @@ func TestStatusCollector_SetAPIStatus_ReadySince(t *testing.T) {
},
},
}
- collector := NewCollector(registry).(*StatusCollector)
+ collector := NewStatusManager(registry).(*StatusCollector)
collector.SetAPIStatus(mcpv1alpha1.APIPhaseReady, "API is ready", "http://test.com")
@@ -275,7 +275,7 @@ func TestStatusCollector_SetAPIStatus_ReadySince(t *testing.T) {
},
},
}
- collector := NewCollector(registry).(*StatusCollector)
+ collector := NewStatusManager(registry).(*StatusCollector)
collector.SetAPIStatus(mcpv1alpha1.APIPhaseReady, "API is ready", "http://test.com")
@@ -285,7 +285,7 @@ func TestStatusCollector_SetAPIStatus_ReadySince(t *testing.T) {
t.Run("clears ReadySince when not ready", func(t *testing.T) {
t.Parallel()
registry := &mcpv1alpha1.MCPRegistry{}
- collector := NewCollector(registry).(*StatusCollector)
+ collector := NewStatusManager(registry).(*StatusCollector)
collector.SetAPIStatus(mcpv1alpha1.APIPhaseError, "API failed", "")
@@ -320,7 +320,7 @@ func TestStatusCollector_Apply(t *testing.T) {
t.Run("applies no changes when hasChanges is false", func(t *testing.T) {
t.Parallel()
- collector := NewCollector(registry).(*StatusCollector)
+ collector := NewStatusManager(registry).(*StatusCollector)
err := collector.Apply(ctx, k8sClient)
@@ -329,7 +329,7 @@ func TestStatusCollector_Apply(t *testing.T) {
t.Run("verifies hasChanges behavior", func(t *testing.T) {
t.Parallel()
- collector := NewCollector(registry).(*StatusCollector)
+ collector := NewStatusManager(registry).(*StatusCollector)
// Initially no changes
assert.False(t, collector.hasChanges)
@@ -341,7 +341,7 @@ func TestStatusCollector_Apply(t *testing.T) {
t.Run("verifies status field collection", func(t *testing.T) {
t.Parallel()
- collector := NewCollector(registry).(*StatusCollector)
+ collector := NewStatusManager(registry).(*StatusCollector)
// Set various status fields
collector.SetPhase(mcpv1alpha1.MCPRegistryPhaseReady)
@@ -370,7 +370,7 @@ func TestStatusCollector_NoChanges(t *testing.T) {
t.Parallel()
registry := &mcpv1alpha1.MCPRegistry{}
- collector := NewCollector(registry).(*StatusCollector)
+ collector := NewStatusManager(registry).(*StatusCollector)
// Initially no changes
assert.False(t, collector.hasChanges)
@@ -384,7 +384,7 @@ func TestStatusCollector_MultipleConditions(t *testing.T) {
t.Parallel()
registry := &mcpv1alpha1.MCPRegistry{}
- collector := NewCollector(registry).(*StatusCollector)
+ collector := NewStatusManager(registry).(*StatusCollector)
// Add condition
collector.SetAPIReadyCondition("APIReady", "API is ready", metav1.ConditionTrue)
@@ -393,3 +393,165 @@ func TestStatusCollector_MultipleConditions(t *testing.T) {
assert.Len(t, collector.conditions, 1)
assert.Contains(t, collector.conditions, mcpv1alpha1.ConditionAPIReady)
}
+
+func TestStatusCollector_ApplyErrors(t *testing.T) {
+ t.Parallel()
+
+ ctx := context.Background()
+
+ // Create scheme
+ scheme := runtime.NewScheme()
+ require.NoError(t, mcpv1alpha1.AddToScheme(scheme))
+
+ t.Run("error fetching latest registry", func(t *testing.T) {
+ t.Parallel()
+
+ // Create client that will fail on Get
+ k8sClient := fake.NewClientBuilder().WithScheme(scheme).Build()
+
+ // Create collector with registry that doesn't exist in client
+ registry := &mcpv1alpha1.MCPRegistry{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "nonexistent-registry",
+ Namespace: "default",
+ },
+ }
+
+ collector := newStatusCollector(registry)
+ collector.SetPhase(mcpv1alpha1.MCPRegistryPhaseReady) // Make some changes
+
+ err := collector.Apply(ctx, k8sClient)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "failed to fetch latest MCPRegistry version")
+ })
+
+}
+
+func TestStatusCollector_InterfaceMethods(t *testing.T) {
+ t.Parallel()
+
+ registry := &mcpv1alpha1.MCPRegistry{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-registry",
+ Namespace: "default",
+ },
+ }
+
+ collector := newStatusCollector(registry)
+
+ t.Run("Sync method returns sync collector", func(t *testing.T) {
+ t.Parallel()
+ syncCollector := collector.Sync()
+ assert.NotNil(t, syncCollector)
+ assert.IsType(t, &syncStatusCollector{}, syncCollector)
+ })
+
+ t.Run("API method returns API collector", func(t *testing.T) {
+ t.Parallel()
+ apiCollector := collector.API()
+ assert.NotNil(t, apiCollector)
+ assert.IsType(t, &apiStatusCollector{}, apiCollector)
+ })
+
+ t.Run("SetOverallStatus delegates correctly", func(t *testing.T) {
+ t.Parallel()
+ collector.SetOverallStatus(mcpv1alpha1.MCPRegistryPhaseReady, "Test message")
+
+ assert.True(t, collector.hasChanges)
+ assert.Equal(t, mcpv1alpha1.MCPRegistryPhaseReady, *collector.phase)
+ assert.Equal(t, "Test message", *collector.message)
+ })
+}
+
+func TestSyncStatusCollector_Methods(t *testing.T) {
+ t.Parallel()
+
+ registry := &mcpv1alpha1.MCPRegistry{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-registry",
+ Namespace: "default",
+ },
+ }
+
+ collector := newStatusCollector(registry)
+ syncCollector := collector.Sync()
+
+ t.Run("SetSyncCondition delegates correctly", func(t *testing.T) {
+ t.Parallel()
+ condition := metav1.Condition{
+ Type: "TestCondition",
+ Status: metav1.ConditionTrue,
+ Reason: "TestReason",
+ Message: "Test message",
+ }
+
+ syncCollector.SetSyncCondition(condition)
+
+ assert.True(t, collector.hasChanges)
+ assert.Contains(t, collector.conditions, "TestCondition")
+ assert.Equal(t, condition, collector.conditions["TestCondition"])
+ })
+
+ t.Run("SetSyncStatus delegates correctly", func(t *testing.T) {
+ t.Parallel()
+ now := metav1.Now()
+ syncCollector.SetSyncStatus(
+ mcpv1alpha1.SyncPhaseComplete,
+ "Sync completed",
+ 1,
+ &now,
+ "hash123",
+ 5,
+ )
+
+ assert.True(t, collector.hasChanges)
+ assert.NotNil(t, collector.syncStatus)
+ assert.Equal(t, mcpv1alpha1.SyncPhaseComplete, collector.syncStatus.Phase)
+ assert.Equal(t, "Sync completed", collector.syncStatus.Message)
+ assert.Equal(t, 1, collector.syncStatus.AttemptCount)
+ assert.Equal(t, &now, collector.syncStatus.LastSyncTime)
+ assert.Equal(t, "hash123", collector.syncStatus.LastSyncHash)
+ assert.Equal(t, 5, collector.syncStatus.ServerCount)
+ })
+}
+
+func TestAPIStatusCollector_Methods(t *testing.T) {
+ t.Parallel()
+
+ registry := &mcpv1alpha1.MCPRegistry{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-registry",
+ Namespace: "default",
+ },
+ }
+
+ collector := newStatusCollector(registry)
+ apiCollector := collector.API()
+
+ t.Run("SetAPIStatus delegates correctly", func(t *testing.T) {
+ t.Parallel()
+ apiCollector.SetAPIStatus(
+ mcpv1alpha1.APIPhaseReady,
+ "API is ready",
+ "http://example.com",
+ )
+
+ assert.True(t, collector.hasChanges)
+ assert.NotNil(t, collector.apiStatus)
+ assert.Equal(t, mcpv1alpha1.APIPhaseReady, collector.apiStatus.Phase)
+ assert.Equal(t, "API is ready", collector.apiStatus.Message)
+ assert.Equal(t, "http://example.com", collector.apiStatus.Endpoint)
+ })
+
+ t.Run("SetAPIReadyCondition delegates correctly", func(t *testing.T) {
+ t.Parallel()
+ apiCollector.SetAPIReadyCondition("APIReady", "API is ready", metav1.ConditionTrue)
+
+ assert.True(t, collector.hasChanges)
+ assert.Contains(t, collector.conditions, mcpv1alpha1.ConditionAPIReady)
+ condition := collector.conditions[mcpv1alpha1.ConditionAPIReady]
+ assert.Equal(t, metav1.ConditionTrue, condition.Status)
+ assert.Equal(t, "APIReady", condition.Reason)
+ assert.Equal(t, "API is ready", condition.Message)
+ })
+}
diff --git a/cmd/thv-operator/pkg/mcpregistrystatus/deriver.go b/cmd/thv-operator/pkg/mcpregistrystatus/deriver.go
new file mode 100644
index 000000000..5d8c6084c
--- /dev/null
+++ b/cmd/thv-operator/pkg/mcpregistrystatus/deriver.go
@@ -0,0 +1,55 @@
+// Package mcpregistrystatus provides status management for MCPRegistry resources.
+package mcpregistrystatus
+
+import (
+ "fmt"
+
+ mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1"
+)
+
+// DefaultStatusDeriver implements the StatusDeriver interface
+type DefaultStatusDeriver struct{}
+
+// NewDefaultStatusDeriver creates a new DefaultStatusDeriver
+func NewDefaultStatusDeriver() StatusDeriver {
+ return &DefaultStatusDeriver{}
+}
+
+// DeriveOverallStatus derives the overall MCPRegistry phase and message from component statuses
+func (*DefaultStatusDeriver) DeriveOverallStatus(
+ syncStatus *mcpv1alpha1.SyncStatus, apiStatus *mcpv1alpha1.APIStatus) (mcpv1alpha1.MCPRegistryPhase, string) {
+ // Handle sync failures first (highest priority)
+ if syncStatus != nil && syncStatus.Phase == mcpv1alpha1.SyncPhaseFailed {
+ return mcpv1alpha1.MCPRegistryPhaseFailed, fmt.Sprintf("Sync failed: %s", syncStatus.Message)
+ }
+
+ // Handle API failures
+ if apiStatus != nil && apiStatus.Phase == mcpv1alpha1.APIPhaseError {
+ return mcpv1alpha1.MCPRegistryPhaseFailed, fmt.Sprintf("API deployment failed: %s", apiStatus.Message)
+ }
+
+ // Handle sync in progress
+ if syncStatus != nil && syncStatus.Phase == mcpv1alpha1.SyncPhaseSyncing {
+ return mcpv1alpha1.MCPRegistryPhaseSyncing, "Registry data synchronization in progress"
+ }
+
+ // Check if both sync and API are ready
+ syncReady := syncStatus != nil &&
+ (syncStatus.Phase == mcpv1alpha1.SyncPhaseComplete)
+ apiReady := apiStatus != nil && apiStatus.Phase == mcpv1alpha1.APIPhaseReady
+
+ if syncReady && apiReady {
+ return mcpv1alpha1.MCPRegistryPhaseReady, "Registry is ready and API is serving requests"
+ }
+
+ // If sync is complete but API is not ready yet
+ if syncReady {
+ if apiStatus != nil && apiStatus.Phase == mcpv1alpha1.APIPhaseDeploying {
+ return mcpv1alpha1.MCPRegistryPhasePending, "Registry data synced, API deployment in progress"
+ }
+ return mcpv1alpha1.MCPRegistryPhasePending, "Registry data synced, API deployment pending"
+ }
+
+ // Default to pending for initial state or unknown combinations
+ return mcpv1alpha1.MCPRegistryPhasePending, "Registry initialization in progress"
+}
diff --git a/cmd/thv-operator/pkg/mcpregistrystatus/deriver_test.go b/cmd/thv-operator/pkg/mcpregistrystatus/deriver_test.go
new file mode 100644
index 000000000..639095f0f
--- /dev/null
+++ b/cmd/thv-operator/pkg/mcpregistrystatus/deriver_test.go
@@ -0,0 +1,244 @@
+package mcpregistrystatus
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1"
+)
+
+func TestNewDefaultStatusDeriver(t *testing.T) {
+ t.Parallel()
+
+ deriver := NewDefaultStatusDeriver()
+ assert.NotNil(t, deriver)
+ assert.IsType(t, &DefaultStatusDeriver{}, deriver)
+}
+
+func TestDeriveOverallStatus(t *testing.T) {
+ t.Parallel()
+
+ deriver := &DefaultStatusDeriver{}
+
+ tests := []struct {
+ name string
+ syncStatus *mcpv1alpha1.SyncStatus
+ apiStatus *mcpv1alpha1.APIStatus
+ expectedPhase mcpv1alpha1.MCPRegistryPhase
+ expectedMessage string
+ description string
+ }{
+ {
+ name: "sync failed - highest priority",
+ syncStatus: &mcpv1alpha1.SyncStatus{
+ Phase: mcpv1alpha1.SyncPhaseFailed,
+ Message: "source unreachable",
+ },
+ apiStatus: &mcpv1alpha1.APIStatus{
+ Phase: mcpv1alpha1.APIPhaseReady,
+ },
+ expectedPhase: mcpv1alpha1.MCPRegistryPhaseFailed,
+ expectedMessage: "Sync failed: source unreachable",
+ description: "Sync failure should take precedence over API ready state",
+ },
+ {
+ name: "API error when sync is complete",
+ syncStatus: &mcpv1alpha1.SyncStatus{
+ Phase: mcpv1alpha1.SyncPhaseComplete,
+ },
+ apiStatus: &mcpv1alpha1.APIStatus{
+ Phase: mcpv1alpha1.APIPhaseError,
+ Message: "deployment failed",
+ },
+ expectedPhase: mcpv1alpha1.MCPRegistryPhaseFailed,
+ expectedMessage: "API deployment failed: deployment failed",
+ description: "API error should result in failed phase",
+ },
+ {
+ name: "sync in progress",
+ syncStatus: &mcpv1alpha1.SyncStatus{
+ Phase: mcpv1alpha1.SyncPhaseSyncing,
+ },
+ apiStatus: &mcpv1alpha1.APIStatus{
+ Phase: mcpv1alpha1.APIPhaseDeploying,
+ },
+ expectedPhase: mcpv1alpha1.MCPRegistryPhaseSyncing,
+ expectedMessage: "Registry data synchronization in progress",
+ description: "Syncing phase should be shown when sync is in progress",
+ },
+ {
+ name: "both sync and API ready",
+ syncStatus: &mcpv1alpha1.SyncStatus{
+ Phase: mcpv1alpha1.SyncPhaseComplete,
+ },
+ apiStatus: &mcpv1alpha1.APIStatus{
+ Phase: mcpv1alpha1.APIPhaseReady,
+ },
+ expectedPhase: mcpv1alpha1.MCPRegistryPhaseReady,
+ expectedMessage: "Registry is ready and API is serving requests",
+ description: "Both components ready should result in ready phase",
+ },
+ {
+ name: "sync complete, API deploying",
+ syncStatus: &mcpv1alpha1.SyncStatus{
+ Phase: mcpv1alpha1.SyncPhaseComplete,
+ },
+ apiStatus: &mcpv1alpha1.APIStatus{
+ Phase: mcpv1alpha1.APIPhaseDeploying,
+ },
+ expectedPhase: mcpv1alpha1.MCPRegistryPhasePending,
+ expectedMessage: "Registry data synced, API deployment in progress",
+ description: "Complete sync with deploying API should be pending",
+ },
+ {
+ name: "sync complete, API status missing",
+ syncStatus: &mcpv1alpha1.SyncStatus{
+ Phase: mcpv1alpha1.SyncPhaseComplete,
+ },
+ apiStatus: nil,
+ expectedPhase: mcpv1alpha1.MCPRegistryPhasePending,
+ expectedMessage: "Registry data synced, API deployment pending",
+ description: "Complete sync without API status should be pending",
+ },
+ {
+ name: "both statuses nil",
+ syncStatus: nil,
+ apiStatus: nil,
+ expectedPhase: mcpv1alpha1.MCPRegistryPhasePending,
+ expectedMessage: "Registry initialization in progress",
+ description: "No status information should default to pending",
+ },
+ {
+ name: "sync nil, API ready",
+ syncStatus: nil,
+ apiStatus: &mcpv1alpha1.APIStatus{
+ Phase: mcpv1alpha1.APIPhaseReady,
+ },
+ expectedPhase: mcpv1alpha1.MCPRegistryPhasePending,
+ expectedMessage: "Registry initialization in progress",
+ description: "Missing sync status should default to pending even with ready API",
+ },
+ {
+ name: "sync complete, API unknown phase",
+ syncStatus: &mcpv1alpha1.SyncStatus{
+ Phase: mcpv1alpha1.SyncPhaseComplete,
+ },
+ apiStatus: &mcpv1alpha1.APIStatus{
+ Phase: "UnknownPhase",
+ },
+ expectedPhase: mcpv1alpha1.MCPRegistryPhasePending,
+ expectedMessage: "Registry data synced, API deployment pending",
+ description: "Unknown API phase should be treated as not ready",
+ },
+ {
+ name: "sync with unknown phase",
+ syncStatus: &mcpv1alpha1.SyncStatus{
+ Phase: "UnknownSyncPhase",
+ },
+ apiStatus: &mcpv1alpha1.APIStatus{
+ Phase: mcpv1alpha1.APIPhaseReady,
+ },
+ expectedPhase: mcpv1alpha1.MCPRegistryPhasePending,
+ expectedMessage: "Registry initialization in progress",
+ description: "Unknown sync phase should default to pending",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ t.Parallel()
+
+ phase, message := deriver.DeriveOverallStatus(tt.syncStatus, tt.apiStatus)
+
+ assert.Equal(t, tt.expectedPhase, phase, tt.description)
+ assert.Equal(t, tt.expectedMessage, message, tt.description)
+ })
+ }
+}
+
+func TestDeriveOverallStatus_PriorityOrdering(t *testing.T) {
+ t.Parallel()
+
+ deriver := &DefaultStatusDeriver{}
+
+ // Test that sync failures take precedence over API errors
+ syncStatus := &mcpv1alpha1.SyncStatus{
+ Phase: mcpv1alpha1.SyncPhaseFailed,
+ Message: "sync failed",
+ }
+ apiStatus := &mcpv1alpha1.APIStatus{
+ Phase: mcpv1alpha1.APIPhaseError,
+ Message: "api failed",
+ }
+
+ phase, message := deriver.DeriveOverallStatus(syncStatus, apiStatus)
+
+ assert.Equal(t, mcpv1alpha1.MCPRegistryPhaseFailed, phase)
+ assert.Contains(t, message, "Sync failed")
+ assert.NotContains(t, message, "API deployment failed")
+}
+
+func TestDeriveOverallStatus_SyncingTakesPrecedence(t *testing.T) {
+ t.Parallel()
+
+ deriver := &DefaultStatusDeriver{}
+
+ // Test that syncing takes precedence over API ready state
+ syncStatus := &mcpv1alpha1.SyncStatus{
+ Phase: mcpv1alpha1.SyncPhaseSyncing,
+ }
+ apiStatus := &mcpv1alpha1.APIStatus{
+ Phase: mcpv1alpha1.APIPhaseReady,
+ }
+
+ phase, message := deriver.DeriveOverallStatus(syncStatus, apiStatus)
+
+ assert.Equal(t, mcpv1alpha1.MCPRegistryPhaseSyncing, phase)
+ assert.Equal(t, "Registry data synchronization in progress", message)
+}
+
+func TestDeriveOverallStatus_EdgeCases(t *testing.T) {
+ t.Parallel()
+
+ deriver := &DefaultStatusDeriver{}
+
+ tests := []struct {
+ name string
+ syncStatus *mcpv1alpha1.SyncStatus
+ apiStatus *mcpv1alpha1.APIStatus
+ description string
+ }{
+ {
+ name: "empty sync status with empty phase",
+ syncStatus: &mcpv1alpha1.SyncStatus{
+ Phase: "",
+ },
+ apiStatus: &mcpv1alpha1.APIStatus{
+ Phase: mcpv1alpha1.APIPhaseReady,
+ },
+ description: "Empty sync phase should be handled gracefully",
+ },
+ {
+ name: "sync status with whitespace message",
+ syncStatus: &mcpv1alpha1.SyncStatus{
+ Phase: mcpv1alpha1.SyncPhaseFailed,
+ Message: " ",
+ },
+ apiStatus: nil,
+ description: "Whitespace in error message should be preserved",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ t.Parallel()
+
+ // Should not panic and should return valid phase/message
+ phase, message := deriver.DeriveOverallStatus(tt.syncStatus, tt.apiStatus)
+
+ assert.NotEmpty(t, phase, tt.description)
+ assert.NotEmpty(t, message, tt.description)
+ })
+ }
+}
diff --git a/cmd/thv-operator/pkg/mcpregistrystatus/mocks/mock_collector.go b/cmd/thv-operator/pkg/mcpregistrystatus/mocks/mock_collector.go
index 4e163e987..031248f88 100644
--- a/cmd/thv-operator/pkg/mcpregistrystatus/mocks/mock_collector.go
+++ b/cmd/thv-operator/pkg/mcpregistrystatus/mocks/mock_collector.go
@@ -3,7 +3,7 @@
//
// Generated by this command:
//
-// mockgen -destination=mocks/mock_collector.go -package=mocks -source=types.go Collector
+// mockgen -destination=mocks/mock_collector.go -package=mocks -source=types.go SyncStatusCollector,APIStatusCollector,StatusDeriver,StatusManager
//
// Package mocks is a generated GoMock package.
@@ -14,105 +14,233 @@ import (
reflect "reflect"
v1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1"
+ mcpregistrystatus "github.com/stacklok/toolhive/cmd/thv-operator/pkg/mcpregistrystatus"
gomock "go.uber.org/mock/gomock"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
client "sigs.k8s.io/controller-runtime/pkg/client"
)
-// MockCollector is a mock of Collector interface.
-type MockCollector struct {
+// MockSyncStatusCollector is a mock of SyncStatusCollector interface.
+type MockSyncStatusCollector struct {
ctrl *gomock.Controller
- recorder *MockCollectorMockRecorder
+ recorder *MockSyncStatusCollectorMockRecorder
isgomock struct{}
}
-// MockCollectorMockRecorder is the mock recorder for MockCollector.
-type MockCollectorMockRecorder struct {
- mock *MockCollector
+// MockSyncStatusCollectorMockRecorder is the mock recorder for MockSyncStatusCollector.
+type MockSyncStatusCollectorMockRecorder struct {
+ mock *MockSyncStatusCollector
}
-// NewMockCollector creates a new mock instance.
-func NewMockCollector(ctrl *gomock.Controller) *MockCollector {
- mock := &MockCollector{ctrl: ctrl}
- mock.recorder = &MockCollectorMockRecorder{mock}
+// NewMockSyncStatusCollector creates a new mock instance.
+func NewMockSyncStatusCollector(ctrl *gomock.Controller) *MockSyncStatusCollector {
+ mock := &MockSyncStatusCollector{ctrl: ctrl}
+ mock.recorder = &MockSyncStatusCollectorMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
-func (m *MockCollector) EXPECT() *MockCollectorMockRecorder {
+func (m *MockSyncStatusCollector) EXPECT() *MockSyncStatusCollectorMockRecorder {
return m.recorder
}
-// Apply mocks base method.
-func (m *MockCollector) Apply(ctx context.Context, k8sClient client.Client) error {
+// SetSyncCondition mocks base method.
+func (m *MockSyncStatusCollector) SetSyncCondition(condition v1.Condition) {
m.ctrl.T.Helper()
- ret := m.ctrl.Call(m, "Apply", ctx, k8sClient)
- ret0, _ := ret[0].(error)
- return ret0
+ m.ctrl.Call(m, "SetSyncCondition", condition)
}
-// Apply indicates an expected call of Apply.
-func (mr *MockCollectorMockRecorder) Apply(ctx, k8sClient any) *gomock.Call {
+// SetSyncCondition indicates an expected call of SetSyncCondition.
+func (mr *MockSyncStatusCollectorMockRecorder) SetSyncCondition(condition any) *gomock.Call {
mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Apply", reflect.TypeOf((*MockCollector)(nil).Apply), ctx, k8sClient)
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetSyncCondition", reflect.TypeOf((*MockSyncStatusCollector)(nil).SetSyncCondition), condition)
+}
+
+// SetSyncStatus mocks base method.
+func (m *MockSyncStatusCollector) SetSyncStatus(phase v1alpha1.SyncPhase, message string, attemptCount int, lastSyncTime *v1.Time, lastSyncHash string, serverCount int) {
+ m.ctrl.T.Helper()
+ m.ctrl.Call(m, "SetSyncStatus", phase, message, attemptCount, lastSyncTime, lastSyncHash, serverCount)
+}
+
+// SetSyncStatus indicates an expected call of SetSyncStatus.
+func (mr *MockSyncStatusCollectorMockRecorder) SetSyncStatus(phase, message, attemptCount, lastSyncTime, lastSyncHash, serverCount any) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetSyncStatus", reflect.TypeOf((*MockSyncStatusCollector)(nil).SetSyncStatus), phase, message, attemptCount, lastSyncTime, lastSyncHash, serverCount)
+}
+
+// MockAPIStatusCollector is a mock of APIStatusCollector interface.
+type MockAPIStatusCollector struct {
+ ctrl *gomock.Controller
+ recorder *MockAPIStatusCollectorMockRecorder
+ isgomock struct{}
+}
+
+// MockAPIStatusCollectorMockRecorder is the mock recorder for MockAPIStatusCollector.
+type MockAPIStatusCollectorMockRecorder struct {
+ mock *MockAPIStatusCollector
+}
+
+// NewMockAPIStatusCollector creates a new mock instance.
+func NewMockAPIStatusCollector(ctrl *gomock.Controller) *MockAPIStatusCollector {
+ mock := &MockAPIStatusCollector{ctrl: ctrl}
+ mock.recorder = &MockAPIStatusCollectorMockRecorder{mock}
+ return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use.
+func (m *MockAPIStatusCollector) EXPECT() *MockAPIStatusCollectorMockRecorder {
+ return m.recorder
}
// SetAPIReadyCondition mocks base method.
-func (m *MockCollector) SetAPIReadyCondition(reason, message string, status v1.ConditionStatus) {
+func (m *MockAPIStatusCollector) SetAPIReadyCondition(reason, message string, status v1.ConditionStatus) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "SetAPIReadyCondition", reason, message, status)
}
// SetAPIReadyCondition indicates an expected call of SetAPIReadyCondition.
-func (mr *MockCollectorMockRecorder) SetAPIReadyCondition(reason, message, status any) *gomock.Call {
+func (mr *MockAPIStatusCollectorMockRecorder) SetAPIReadyCondition(reason, message, status any) *gomock.Call {
mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetAPIReadyCondition", reflect.TypeOf((*MockCollector)(nil).SetAPIReadyCondition), reason, message, status)
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetAPIReadyCondition", reflect.TypeOf((*MockAPIStatusCollector)(nil).SetAPIReadyCondition), reason, message, status)
}
// SetAPIStatus mocks base method.
-func (m *MockCollector) SetAPIStatus(phase v1alpha1.APIPhase, message, endpoint string) {
+func (m *MockAPIStatusCollector) SetAPIStatus(phase v1alpha1.APIPhase, message, endpoint string) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "SetAPIStatus", phase, message, endpoint)
}
// SetAPIStatus indicates an expected call of SetAPIStatus.
-func (mr *MockCollectorMockRecorder) SetAPIStatus(phase, message, endpoint any) *gomock.Call {
+func (mr *MockAPIStatusCollectorMockRecorder) SetAPIStatus(phase, message, endpoint any) *gomock.Call {
mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetAPIStatus", reflect.TypeOf((*MockCollector)(nil).SetAPIStatus), phase, message, endpoint)
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetAPIStatus", reflect.TypeOf((*MockAPIStatusCollector)(nil).SetAPIStatus), phase, message, endpoint)
+}
+
+// MockStatusDeriver is a mock of StatusDeriver interface.
+type MockStatusDeriver struct {
+ ctrl *gomock.Controller
+ recorder *MockStatusDeriverMockRecorder
+ isgomock struct{}
}
-// SetMessage mocks base method.
-func (m *MockCollector) SetMessage(message string) {
+// MockStatusDeriverMockRecorder is the mock recorder for MockStatusDeriver.
+type MockStatusDeriverMockRecorder struct {
+ mock *MockStatusDeriver
+}
+
+// NewMockStatusDeriver creates a new mock instance.
+func NewMockStatusDeriver(ctrl *gomock.Controller) *MockStatusDeriver {
+ mock := &MockStatusDeriver{ctrl: ctrl}
+ mock.recorder = &MockStatusDeriverMockRecorder{mock}
+ return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use.
+func (m *MockStatusDeriver) EXPECT() *MockStatusDeriverMockRecorder {
+ return m.recorder
+}
+
+// DeriveOverallStatus mocks base method.
+func (m *MockStatusDeriver) DeriveOverallStatus(syncStatus *v1alpha1.SyncStatus, apiStatus *v1alpha1.APIStatus) (v1alpha1.MCPRegistryPhase, string) {
m.ctrl.T.Helper()
- m.ctrl.Call(m, "SetMessage", message)
+ ret := m.ctrl.Call(m, "DeriveOverallStatus", syncStatus, apiStatus)
+ ret0, _ := ret[0].(v1alpha1.MCPRegistryPhase)
+ ret1, _ := ret[1].(string)
+ return ret0, ret1
}
-// SetMessage indicates an expected call of SetMessage.
-func (mr *MockCollectorMockRecorder) SetMessage(message any) *gomock.Call {
+// DeriveOverallStatus indicates an expected call of DeriveOverallStatus.
+func (mr *MockStatusDeriverMockRecorder) DeriveOverallStatus(syncStatus, apiStatus any) *gomock.Call {
mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetMessage", reflect.TypeOf((*MockCollector)(nil).SetMessage), message)
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeriveOverallStatus", reflect.TypeOf((*MockStatusDeriver)(nil).DeriveOverallStatus), syncStatus, apiStatus)
}
-// SetPhase mocks base method.
-func (m *MockCollector) SetPhase(phase v1alpha1.MCPRegistryPhase) {
+// MockStatusManager is a mock of StatusManager interface.
+type MockStatusManager struct {
+ ctrl *gomock.Controller
+ recorder *MockStatusManagerMockRecorder
+ isgomock struct{}
+}
+
+// MockStatusManagerMockRecorder is the mock recorder for MockStatusManager.
+type MockStatusManagerMockRecorder struct {
+ mock *MockStatusManager
+}
+
+// NewMockStatusManager creates a new mock instance.
+func NewMockStatusManager(ctrl *gomock.Controller) *MockStatusManager {
+ mock := &MockStatusManager{ctrl: ctrl}
+ mock.recorder = &MockStatusManagerMockRecorder{mock}
+ return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use.
+func (m *MockStatusManager) EXPECT() *MockStatusManagerMockRecorder {
+ return m.recorder
+}
+
+// API mocks base method.
+func (m *MockStatusManager) API() mcpregistrystatus.APIStatusCollector {
m.ctrl.T.Helper()
- m.ctrl.Call(m, "SetPhase", phase)
+ ret := m.ctrl.Call(m, "API")
+ ret0, _ := ret[0].(mcpregistrystatus.APIStatusCollector)
+ return ret0
}
-// SetPhase indicates an expected call of SetPhase.
-func (mr *MockCollectorMockRecorder) SetPhase(phase any) *gomock.Call {
+// API indicates an expected call of API.
+func (mr *MockStatusManagerMockRecorder) API() *gomock.Call {
mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetPhase", reflect.TypeOf((*MockCollector)(nil).SetPhase), phase)
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "API", reflect.TypeOf((*MockStatusManager)(nil).API))
}
-// SetSyncStatus mocks base method.
-func (m *MockCollector) SetSyncStatus(phase v1alpha1.SyncPhase, message string, attemptCount int, lastSyncTime *v1.Time, lastSyncHash string, serverCount int) {
+// Apply mocks base method.
+func (m *MockStatusManager) Apply(ctx context.Context, k8sClient client.Client) error {
m.ctrl.T.Helper()
- m.ctrl.Call(m, "SetSyncStatus", phase, message, attemptCount, lastSyncTime, lastSyncHash, serverCount)
+ ret := m.ctrl.Call(m, "Apply", ctx, k8sClient)
+ ret0, _ := ret[0].(error)
+ return ret0
}
-// SetSyncStatus indicates an expected call of SetSyncStatus.
-func (mr *MockCollectorMockRecorder) SetSyncStatus(phase, message, attemptCount, lastSyncTime, lastSyncHash, serverCount any) *gomock.Call {
+// Apply indicates an expected call of Apply.
+func (mr *MockStatusManagerMockRecorder) Apply(ctx, k8sClient any) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Apply", reflect.TypeOf((*MockStatusManager)(nil).Apply), ctx, k8sClient)
+}
+
+// SetCondition mocks base method.
+func (m *MockStatusManager) SetCondition(conditionType, reason, message string, status v1.ConditionStatus) {
+ m.ctrl.T.Helper()
+ m.ctrl.Call(m, "SetCondition", conditionType, reason, message, status)
+}
+
+// SetCondition indicates an expected call of SetCondition.
+func (mr *MockStatusManagerMockRecorder) SetCondition(conditionType, reason, message, status any) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetCondition", reflect.TypeOf((*MockStatusManager)(nil).SetCondition), conditionType, reason, message, status)
+}
+
+// SetOverallStatus mocks base method.
+func (m *MockStatusManager) SetOverallStatus(phase v1alpha1.MCPRegistryPhase, message string) {
+ m.ctrl.T.Helper()
+ m.ctrl.Call(m, "SetOverallStatus", phase, message)
+}
+
+// SetOverallStatus indicates an expected call of SetOverallStatus.
+func (mr *MockStatusManagerMockRecorder) SetOverallStatus(phase, message any) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetOverallStatus", reflect.TypeOf((*MockStatusManager)(nil).SetOverallStatus), phase, message)
+}
+
+// Sync mocks base method.
+func (m *MockStatusManager) Sync() mcpregistrystatus.SyncStatusCollector {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "Sync")
+ ret0, _ := ret[0].(mcpregistrystatus.SyncStatusCollector)
+ return ret0
+}
+
+// Sync indicates an expected call of Sync.
+func (mr *MockStatusManagerMockRecorder) Sync() *gomock.Call {
mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetSyncStatus", reflect.TypeOf((*MockCollector)(nil).SetSyncStatus), phase, message, attemptCount, lastSyncTime, lastSyncHash, serverCount)
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Sync", reflect.TypeOf((*MockStatusManager)(nil).Sync))
}
diff --git a/cmd/thv-operator/pkg/mcpregistrystatus/types.go b/cmd/thv-operator/pkg/mcpregistrystatus/types.go
index 858e9520b..c1d8c7311 100644
--- a/cmd/thv-operator/pkg/mcpregistrystatus/types.go
+++ b/cmd/thv-operator/pkg/mcpregistrystatus/types.go
@@ -10,29 +10,63 @@ import (
mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1"
)
-//go:generate mockgen -destination=mocks/mock_collector.go -package=mocks -source=types.go Collector
+// Error represents a structured error with condition information for operator components
+type Error struct {
+ Err error
+ Message string
+ ConditionType string
+ ConditionReason string
+}
-// Collector defines the interface for collecting MCPRegistry status updates.
-// It provides methods to collect status changes during reconciliation
-// and apply them in a single batch update at the end.
-type Collector interface {
- // SetAPIReadyCondition sets the API ready condition with the specified reason, message, and status
- SetAPIReadyCondition(reason, message string, status metav1.ConditionStatus)
+func (e *Error) Error() string {
+ return e.Message
+}
- // SetPhase sets the MCPRegistry phase in the status (overall phase)
- SetPhase(phase mcpv1alpha1.MCPRegistryPhase)
+func (e *Error) Unwrap() error {
+ return e.Err
+}
- // SetMessage sets the status message (overall message)
- SetMessage(message string)
+//go:generate mockgen -destination=mocks/mock_status.go -package=mocks -source=types.go SyncStatusCollector,APIStatusCollector,StatusDeriver,StatusManager
+// SyncStatusCollector handles sync-related status updates
+type SyncStatusCollector interface {
// SetSyncStatus sets the detailed sync status
- SetSyncStatus(
- phase mcpv1alpha1.SyncPhase, message string, attemptCount int,
+ SetSyncStatus(phase mcpv1alpha1.SyncPhase, message string, attemptCount int,
lastSyncTime *metav1.Time, lastSyncHash string, serverCount int)
+ // SetSyncCondition sets a sync-related condition
+ SetSyncCondition(condition metav1.Condition)
+}
+
+// APIStatusCollector handles API-related status updates
+type APIStatusCollector interface {
// SetAPIStatus sets the detailed API status
SetAPIStatus(phase mcpv1alpha1.APIPhase, message string, endpoint string)
+ // SetAPIReadyCondition sets the API ready condition with the specified reason, message, and status
+ SetAPIReadyCondition(reason, message string, status metav1.ConditionStatus)
+}
+
+// StatusDeriver handles overall status derivation logic
+type StatusDeriver interface {
+ // DeriveOverallStatus derives the overall MCPRegistry phase and message from component statuses
+ DeriveOverallStatus(syncStatus *mcpv1alpha1.SyncStatus, apiStatus *mcpv1alpha1.APIStatus) (mcpv1alpha1.MCPRegistryPhase, string)
+}
+
+// StatusManager orchestrates all status updates and provides access to domain-specific collectors
+type StatusManager interface {
+ // Sync returns the sync status collector
+ Sync() SyncStatusCollector
+
+ // API returns the API status collector
+ API() APIStatusCollector
+
+ // SetOverallStatus sets the overall phase and message explicitly (for special cases)
+ SetOverallStatus(phase mcpv1alpha1.MCPRegistryPhase, message string)
+
+ // SetCondition sets a general condition
+ SetCondition(conditionType, reason, message string, status metav1.ConditionStatus)
+
// Apply applies all collected status changes in a single batch update
Apply(ctx context.Context, k8sClient client.Client) error
}
diff --git a/cmd/thv-operator/pkg/mcpregistrystatus/types_test.go b/cmd/thv-operator/pkg/mcpregistrystatus/types_test.go
new file mode 100644
index 000000000..90d4aeb9e
--- /dev/null
+++ b/cmd/thv-operator/pkg/mcpregistrystatus/types_test.go
@@ -0,0 +1,168 @@
+package mcpregistrystatus
+
+import (
+ "errors"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestError_Error(t *testing.T) {
+ t.Parallel()
+
+ tests := []struct {
+ name string
+ err *Error
+ expected string
+ }{
+ {
+ name: "normal message",
+ err: &Error{
+ Err: errors.New("underlying error"),
+ Message: "custom error message",
+ ConditionType: "TestCondition",
+ ConditionReason: "TestReason",
+ },
+ expected: "custom error message",
+ },
+ {
+ name: "empty message",
+ err: &Error{
+ Err: errors.New("underlying error"),
+ Message: "",
+ ConditionType: "TestCondition",
+ ConditionReason: "TestReason",
+ },
+ expected: "",
+ },
+ {
+ name: "message with special characters",
+ err: &Error{
+ Err: errors.New("underlying error"),
+ Message: "Error: 50% of deployments failed\nRetry needed",
+ ConditionType: "TestCondition",
+ ConditionReason: "TestReason",
+ },
+ expected: "Error: 50% of deployments failed\nRetry needed",
+ },
+ {
+ name: "nil underlying error",
+ err: &Error{
+ Err: nil,
+ Message: "custom message without underlying error",
+ ConditionType: "TestCondition",
+ ConditionReason: "TestReason",
+ },
+ expected: "custom message without underlying error",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ t.Parallel()
+
+ result := tt.err.Error()
+ assert.Equal(t, tt.expected, result)
+ })
+ }
+}
+
+func TestError_Unwrap(t *testing.T) {
+ t.Parallel()
+
+ tests := []struct {
+ name string
+ err *Error
+ expected error
+ }{
+ {
+ name: "normal underlying error",
+ err: &Error{
+ Err: errors.New("underlying error"),
+ Message: "custom error message",
+ ConditionType: "TestCondition",
+ ConditionReason: "TestReason",
+ },
+ expected: errors.New("underlying error"),
+ },
+ {
+ name: "nil underlying error",
+ err: &Error{
+ Err: nil,
+ Message: "custom error message",
+ ConditionType: "TestCondition",
+ ConditionReason: "TestReason",
+ },
+ expected: nil,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ t.Parallel()
+
+ result := tt.err.Unwrap()
+ if tt.expected == nil {
+ assert.Nil(t, result)
+ } else {
+ assert.Equal(t, tt.expected.Error(), result.Error())
+ }
+ })
+ }
+}
+
+func TestError_Interface(t *testing.T) {
+ t.Parallel()
+
+ // Test that Error implements the error interface
+ var _ error = &Error{}
+
+ // Test error chaining with errors.Is and errors.As
+ originalErr := errors.New("original error")
+ wrappedErr := &Error{
+ Err: originalErr,
+ Message: "wrapped error",
+ ConditionType: "TestCondition",
+ ConditionReason: "TestReason",
+ }
+
+ // Test errors.Is
+ assert.True(t, errors.Is(wrappedErr, originalErr))
+
+ // Test errors.As
+ var targetErr *Error
+ assert.True(t, errors.As(wrappedErr, &targetErr))
+ assert.Equal(t, "wrapped error", targetErr.Message)
+ assert.Equal(t, "TestCondition", targetErr.ConditionType)
+ assert.Equal(t, "TestReason", targetErr.ConditionReason)
+}
+
+func TestError_Fields(t *testing.T) {
+ t.Parallel()
+
+ originalErr := errors.New("original error")
+ err := &Error{
+ Err: originalErr,
+ Message: "custom message",
+ ConditionType: "SyncFailed",
+ ConditionReason: "NetworkError",
+ }
+
+ // Test that all fields are accessible and correct
+ assert.Equal(t, originalErr, err.Err)
+ assert.Equal(t, "custom message", err.Message)
+ assert.Equal(t, "SyncFailed", err.ConditionType)
+ assert.Equal(t, "NetworkError", err.ConditionReason)
+}
+
+func TestError_ZeroValue(t *testing.T) {
+ t.Parallel()
+
+ // Test zero value behavior
+ var err Error
+
+ assert.Equal(t, "", err.Error())
+ assert.Nil(t, err.Unwrap())
+ assert.Equal(t, "", err.ConditionType)
+ assert.Equal(t, "", err.ConditionReason)
+}
diff --git a/cmd/thv-operator/pkg/registryapi/manager.go b/cmd/thv-operator/pkg/registryapi/manager.go
index 807eb4253..7cfc0440f 100644
--- a/cmd/thv-operator/pkg/registryapi/manager.go
+++ b/cmd/thv-operator/pkg/registryapi/manager.go
@@ -6,7 +6,6 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
@@ -43,8 +42,8 @@ func NewManager(
// This method coordinates all aspects of API service including creating/updating the deployment and service,
// checking readiness, and updating the MCPRegistry status with deployment references and endpoint information.
func (m *manager) ReconcileAPIService(
- ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry, statusCollector mcpregistrystatus.Collector,
-) error {
+ ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry,
+) *mcpregistrystatus.Error {
ctxLogger := log.FromContext(ctx).WithValues("mcpregistry", mcpRegistry.Name)
ctxLogger.Info("Reconciling API service")
@@ -52,33 +51,33 @@ func (m *manager) ReconcileAPIService(
deployment, err := m.ensureDeployment(ctx, mcpRegistry)
if err != nil {
ctxLogger.Error(err, "Failed to ensure deployment")
- // Update status with failure condition
- statusCollector.SetAPIStatus(mcpv1alpha1.APIPhaseError,
- fmt.Sprintf("Failed to ensure deployment: %v", err), "")
- statusCollector.SetAPIReadyCondition("DeploymentFailed",
- fmt.Sprintf("Failed to ensure deployment: %v", err), metav1.ConditionFalse)
- return fmt.Errorf("failed to ensure deployment: %w", err)
+ return &mcpregistrystatus.Error{
+ Err: err,
+ Message: fmt.Sprintf("Failed to ensure deployment: %v", err),
+ ConditionType: mcpv1alpha1.ConditionAPIReady,
+ ConditionReason: "DeploymentFailed",
+ }
}
// Step 2: Ensure service exists and is configured correctly
- service, err := m.ensureService(ctx, mcpRegistry)
+ _, err = m.ensureService(ctx, mcpRegistry)
if err != nil {
ctxLogger.Error(err, "Failed to ensure service")
- // Update status with failure condition
- statusCollector.SetAPIStatus(mcpv1alpha1.APIPhaseError,
- fmt.Sprintf("Failed to ensure service: %v", err), "")
- statusCollector.SetAPIReadyCondition("ServiceFailed",
- fmt.Sprintf("Failed to ensure service: %v", err), metav1.ConditionFalse)
- return fmt.Errorf("failed to ensure service: %w", err)
+ return &mcpregistrystatus.Error{
+ Err: err,
+ Message: fmt.Sprintf("Failed to ensure service: %v", err),
+ ConditionType: mcpv1alpha1.ConditionAPIReady,
+ ConditionReason: "ServiceFailed",
+ }
}
// Step 3: Check API readiness
isReady := m.CheckAPIReadiness(ctx, deployment)
- // Step 4: Update MCPRegistry status with deployment and service references
- m.updateAPIStatus(ctx, mcpRegistry, deployment, service, isReady, statusCollector)
+ // Note: Status updates are now handled by the controller
+ // The controller can call IsAPIReady to check readiness and update status accordingly
- // Step 5: Log completion status
+ // Step 4: Log completion status
if isReady {
ctxLogger.Info("API service reconciliation completed successfully - API is ready")
} else {
@@ -109,59 +108,6 @@ func (m *manager) IsAPIReady(ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRe
return m.CheckAPIReadiness(ctx, deployment)
}
-// updateAPIStatus updates the MCPRegistry status with deployment and service references and API endpoint information
-func (*manager) updateAPIStatus(ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry,
- _ *appsv1.Deployment, service *corev1.Service, isReady bool, statusCollector mcpregistrystatus.Collector) {
- ctxLogger := log.FromContext(ctx)
-
- // Determine API endpoint
- var endpoint string
- if service != nil {
- // Construct internal URL from service information
- endpoint = fmt.Sprintf("http://%s.%s.svc.cluster.local:%d",
- service.Name, service.Namespace, service.Spec.Ports[0].Port)
- }
-
- // Set detailed API status
- var apiPhase mcpv1alpha1.APIPhase
- var reason, message string
-
- if isReady {
- apiPhase = mcpv1alpha1.APIPhaseReady
- reason = "APIReady"
- message = "Registry API is ready and serving requests"
- } else {
- apiPhase = mcpv1alpha1.APIPhaseDeploying
- reason = "APINotReady"
- message = "Registry API deployment is not ready yet"
- }
-
- // Only update API status if it has changed
- currentAPIPhase := mcpv1alpha1.APIPhaseNotStarted // default
- currentMessage := ""
- currentEndpoint := ""
- if mcpRegistry.Status.APIStatus != nil {
- currentAPIPhase = mcpRegistry.Status.APIStatus.Phase
- currentMessage = mcpRegistry.Status.APIStatus.Message
- currentEndpoint = mcpRegistry.Status.APIStatus.Endpoint
- }
-
- // Set API status only if it has changed
- if currentAPIPhase != apiPhase || currentMessage != message || currentEndpoint != endpoint {
- statusCollector.SetAPIStatus(apiPhase, message, endpoint)
- statusCollector.SetAPIReadyCondition(reason, message,
- func() metav1.ConditionStatus {
- if isReady {
- return metav1.ConditionTrue
- }
- return metav1.ConditionFalse
- }())
- }
-
- ctxLogger.V(1).Info("Prepared API status update for batching",
- "apiPhase", apiPhase, "apiReady", isReady)
-}
-
// ConfigureDeploymentStorage configures a deployment with storage-specific requirements.
// This method inverts the dependency by having the deployment manager configure itself
// based on the storage manager type, following the dependency inversion principle.
diff --git a/cmd/thv-operator/pkg/registryapi/manager_test.go b/cmd/thv-operator/pkg/registryapi/manager_test.go
index d251bd4c8..55b9502e2 100644
--- a/cmd/thv-operator/pkg/registryapi/manager_test.go
+++ b/cmd/thv-operator/pkg/registryapi/manager_test.go
@@ -13,7 +13,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1"
- mcpregistrystatusmocks "github.com/stacklok/toolhive/cmd/thv-operator/pkg/mcpregistrystatus/mocks"
"github.com/stacklok/toolhive/cmd/thv-operator/pkg/sources"
sourcesmocks "github.com/stacklok/toolhive/cmd/thv-operator/pkg/sources/mocks"
)
@@ -390,92 +389,3 @@ func TestManagerCheckAPIReadiness(t *testing.T) {
})
}
}
-
-func TestManagerUpdateAPIStatus(t *testing.T) {
- t.Parallel()
-
- tests := []struct {
- name string
- service *corev1.Service
- isReady bool
- setupMocks func(*mcpregistrystatusmocks.MockCollector)
- description string
- }{
- {
- name: "ready API with service",
- service: &corev1.Service{
- ObjectMeta: metav1.ObjectMeta{
- Name: "test-service",
- Namespace: "test-namespace",
- },
- Spec: corev1.ServiceSpec{
- Ports: []corev1.ServicePort{
- {
- Port: 8080,
- },
- },
- },
- },
- isReady: true,
- setupMocks: func(m *mcpregistrystatusmocks.MockCollector) {
- m.EXPECT().SetAPIStatus(mcpv1alpha1.APIPhaseReady, "Registry API is ready and serving requests", "http://test-service.test-namespace.svc.cluster.local:8080")
- m.EXPECT().SetAPIReadyCondition("APIReady", "Registry API is ready and serving requests", metav1.ConditionTrue)
- },
- description: "Should set endpoint and ready condition when API is ready",
- },
- {
- name: "not ready API with service",
- service: &corev1.Service{
- ObjectMeta: metav1.ObjectMeta{
- Name: "test-service",
- Namespace: "test-namespace",
- },
- Spec: corev1.ServiceSpec{
- Ports: []corev1.ServicePort{
- {
- Port: 8080,
- },
- },
- },
- },
- isReady: false,
- setupMocks: func(m *mcpregistrystatusmocks.MockCollector) {
- m.EXPECT().SetAPIStatus(mcpv1alpha1.APIPhaseDeploying, "Registry API deployment is not ready yet", "http://test-service.test-namespace.svc.cluster.local:8080")
- m.EXPECT().SetAPIReadyCondition("APINotReady", "Registry API deployment is not ready yet", metav1.ConditionFalse)
- },
- description: "Should set endpoint and not ready condition when API is not ready",
- },
- {
- name: "no service provided",
- service: nil,
- isReady: false,
- setupMocks: func(m *mcpregistrystatusmocks.MockCollector) {
- m.EXPECT().SetAPIStatus(mcpv1alpha1.APIPhaseDeploying, "Registry API deployment is not ready yet", "")
- m.EXPECT().SetAPIReadyCondition("APINotReady", "Registry API deployment is not ready yet", metav1.ConditionFalse)
- },
- description: "Should only set condition when no service is provided",
- },
- }
-
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- t.Parallel()
- ctrl := gomock.NewController(t)
- defer ctrl.Finish()
-
- mockCollector := mcpregistrystatusmocks.NewMockCollector(ctrl)
- tt.setupMocks(mockCollector)
-
- manager := &manager{}
- ctx := context.Background()
- mcpRegistry := &mcpv1alpha1.MCPRegistry{
- ObjectMeta: metav1.ObjectMeta{
- Name: "test-registry",
- Namespace: "test-namespace",
- },
- }
-
- manager.updateAPIStatus(ctx, mcpRegistry, nil, tt.service, tt.isReady, mockCollector)
- })
- }
-}
diff --git a/cmd/thv-operator/pkg/registryapi/mocks/mock_manager.go b/cmd/thv-operator/pkg/registryapi/mocks/mock_manager.go
index 7ae3963cc..18c0ae40b 100644
--- a/cmd/thv-operator/pkg/registryapi/mocks/mock_manager.go
+++ b/cmd/thv-operator/pkg/registryapi/mocks/mock_manager.go
@@ -72,15 +72,15 @@ func (mr *MockManagerMockRecorder) IsAPIReady(ctx, mcpRegistry any) *gomock.Call
}
// ReconcileAPIService mocks base method.
-func (m *MockManager) ReconcileAPIService(ctx context.Context, mcpRegistry *v1alpha1.MCPRegistry, statusCollector mcpregistrystatus.Collector) error {
+func (m *MockManager) ReconcileAPIService(ctx context.Context, mcpRegistry *v1alpha1.MCPRegistry) *mcpregistrystatus.Error {
m.ctrl.T.Helper()
- ret := m.ctrl.Call(m, "ReconcileAPIService", ctx, mcpRegistry, statusCollector)
- ret0, _ := ret[0].(error)
+ ret := m.ctrl.Call(m, "ReconcileAPIService", ctx, mcpRegistry)
+ ret0, _ := ret[0].(*mcpregistrystatus.Error)
return ret0
}
// ReconcileAPIService indicates an expected call of ReconcileAPIService.
-func (mr *MockManagerMockRecorder) ReconcileAPIService(ctx, mcpRegistry, statusCollector any) *gomock.Call {
+func (mr *MockManagerMockRecorder) ReconcileAPIService(ctx, mcpRegistry any) *gomock.Call {
mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReconcileAPIService", reflect.TypeOf((*MockManager)(nil).ReconcileAPIService), ctx, mcpRegistry, statusCollector)
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReconcileAPIService", reflect.TypeOf((*MockManager)(nil).ReconcileAPIService), ctx, mcpRegistry)
}
diff --git a/cmd/thv-operator/pkg/registryapi/types.go b/cmd/thv-operator/pkg/registryapi/types.go
index 20ec7ff9e..265b46f4d 100644
--- a/cmd/thv-operator/pkg/registryapi/types.go
+++ b/cmd/thv-operator/pkg/registryapi/types.go
@@ -59,7 +59,7 @@ const (
// Manager handles registry API deployment operations
type Manager interface {
// ReconcileAPIService orchestrates the deployment, service creation, and readiness checking for the registry API
- ReconcileAPIService(ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry, statusCollector mcpregistrystatus.Collector) error
+ ReconcileAPIService(ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry) *mcpregistrystatus.Error
// CheckAPIReadiness verifies that the deployed registry-API Deployment is ready
CheckAPIReadiness(ctx context.Context, deployment *appsv1.Deployment) bool
diff --git a/cmd/thv-operator/pkg/sync/manager.go b/cmd/thv-operator/pkg/sync/manager.go
index f1f1f596d..05a2a4755 100644
--- a/cmd/thv-operator/pkg/sync/manager.go
+++ b/cmd/thv-operator/pkg/sync/manager.go
@@ -5,7 +5,6 @@ import (
"fmt"
"time"
- "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
@@ -14,6 +13,7 @@ import (
mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1"
"github.com/stacklok/toolhive/cmd/thv-operator/pkg/filtering"
+ "github.com/stacklok/toolhive/cmd/thv-operator/pkg/mcpregistrystatus"
"github.com/stacklok/toolhive/cmd/thv-operator/pkg/sources"
)
@@ -26,8 +26,9 @@ type Result struct {
// Sync reason constants
const (
// Registry state related reasons
- ReasonAlreadyInProgress = "sync-already-in-progress"
- ReasonRegistryNotReady = "registry-not-ready"
+ ReasonAlreadyInProgress = "sync-already-in-progress"
+ ReasonRegistryNotReady = "registry-not-ready"
+ ReasonRequeueTimeNotElapsed = "requeue-time-not-elapsed"
// Data change related reasons
ReasonSourceDataChanged = "source-data-changed"
@@ -63,13 +64,26 @@ const (
conditionReasonStorageFailed = "StorageFailed"
)
+// Default timing constants for the sync manager
+const (
+ // DefaultSyncRequeueAfterConstant is the constant default requeue interval for sync operations
+ DefaultSyncRequeueAfterConstant = time.Minute * 5
+)
+
+// Configurable timing variables for testing
+var (
+ // DefaultSyncRequeueAfter is the configurable default requeue interval for sync operations
+ // This can be modified in tests to speed up requeue behavior
+ DefaultSyncRequeueAfter = DefaultSyncRequeueAfterConstant
+)
+
// Manager manages synchronization operations for MCPRegistry resources
type Manager interface {
// ShouldSync determines if a sync operation is needed
- ShouldSync(ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry) (bool, string, *time.Time, error)
+ ShouldSync(ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry) (bool, string, *time.Time)
// PerformSync executes the complete sync operation
- PerformSync(ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry) (ctrl.Result, *Result, error)
+ PerformSync(ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry) (ctrl.Result, *Result, *mcpregistrystatus.Error)
// UpdateManualSyncTriggerOnly updates manual sync trigger tracking without performing actual sync
UpdateManualSyncTriggerOnly(ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry) (ctrl.Result, error)
@@ -127,21 +141,29 @@ func NewDefaultSyncManager(k8sClient client.Client, scheme *runtime.Scheme,
// ShouldSync determines if a sync operation is needed and when the next sync should occur
func (s *DefaultSyncManager) ShouldSync(
ctx context.Context,
- mcpRegistry *mcpv1alpha1.MCPRegistry) (bool, string, *time.Time, error) {
+ mcpRegistry *mcpv1alpha1.MCPRegistry) (bool, string, *time.Time) {
+ ctxLogger := log.FromContext(ctx)
+
// If registry is currently syncing, don't start another sync
if mcpRegistry.Status.Phase == mcpv1alpha1.MCPRegistryPhaseSyncing {
- return false, ReasonAlreadyInProgress, nil, nil
+ return false, ReasonAlreadyInProgress, nil
}
// Check if sync is needed based on registry state
if syncNeeded := s.isSyncNeededForState(mcpRegistry); syncNeeded {
- return true, ReasonRegistryNotReady, nil, nil
+ if requeueElapsed := s.isRequeueElapsed(mcpRegistry); requeueElapsed {
+ return true, ReasonRegistryNotReady, nil
+ }
+ ctxLogger.Info("Sync not needed because requeue time not elapsed",
+ "requeueTime", DefaultSyncRequeueAfter, "lastAttempt", mcpRegistry.Status.SyncStatus.LastAttempt)
+ return false, ReasonRequeueTimeNotElapsed, nil
}
// Check if source data has changed by comparing hash
dataChanged, err := s.dataChangeDetector.IsDataChanged(ctx, mcpRegistry)
if err != nil {
- return true, ReasonErrorCheckingChanges, nil, err
+ ctxLogger.Error(err, "Failed to determine if data has changed")
+ return true, ReasonErrorCheckingChanges, nil
}
// Check for manual sync trigger first (always update trigger tracking)
@@ -149,29 +171,30 @@ func (s *DefaultSyncManager) ShouldSync(
// Manual sync was requested - but only sync if data has actually changed
if manualSyncRequested {
if dataChanged {
- return true, ReasonManualWithChanges, nil, nil
+ return true, ReasonManualWithChanges, nil
}
// Manual sync requested but no data changes - update trigger tracking only
- return true, ReasonManualNoChanges, nil, nil
+ return true, ReasonManualNoChanges, nil
}
if dataChanged {
- return true, ReasonSourceDataChanged, nil, nil
+ return true, ReasonSourceDataChanged, nil
}
// Data hasn't changed - check if we need to schedule future checks
if mcpRegistry.Spec.SyncPolicy != nil {
_, nextSyncTime, err := s.automaticSyncChecker.IsIntervalSyncNeeded(mcpRegistry)
if err != nil {
- return true, ReasonErrorParsingInterval, nil, err
+ ctxLogger.Error(err, "Failed to determine if interval sync is needed")
+ return true, ReasonErrorParsingInterval, nil
}
// No sync needed since data hasn't changed, but schedule next check
- return false, ReasonUpToDateWithPolicy, &nextSyncTime, nil
+ return false, ReasonUpToDateWithPolicy, &nextSyncTime
}
// No automatic sync policy, registry is up-to-date
- return false, ReasonUpToDateNoPolicy, nil, nil
+ return false, ReasonUpToDateNoPolicy, nil
}
// isSyncNeededForState checks if sync is needed based on the registry's current state
@@ -184,7 +207,7 @@ func (*DefaultSyncManager) isSyncNeededForState(mcpRegistry *mcpv1alpha1.MCPRegi
return true
}
// If sync is not complete, sync is needed
- if syncPhase != mcpv1alpha1.SyncPhaseComplete && syncPhase != mcpv1alpha1.SyncPhaseIdle {
+ if syncPhase != mcpv1alpha1.SyncPhaseComplete {
return true
}
// Sync is complete, no sync needed based on state
@@ -207,20 +230,28 @@ func (*DefaultSyncManager) isSyncNeededForState(mcpRegistry *mcpv1alpha1.MCPRegi
return false
}
+// isRequeueElapsed checks if the requeue time has elapsed
+func (*DefaultSyncManager) isRequeueElapsed(mcpRegistry *mcpv1alpha1.MCPRegistry) bool {
+ if mcpRegistry.Status.SyncStatus != nil && mcpRegistry.Status.SyncStatus.LastAttempt != nil {
+ return time.Now().After(mcpRegistry.Status.SyncStatus.LastAttempt.Add(DefaultSyncRequeueAfter))
+ }
+ return true
+}
+
// PerformSync performs the complete sync operation for the MCPRegistry
// The controller is responsible for setting sync status via the status collector
func (s *DefaultSyncManager) PerformSync(
ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry,
-) (ctrl.Result, *Result, error) {
+) (ctrl.Result, *Result, *mcpregistrystatus.Error) {
// Fetch and process registry data
fetchResult, err := s.fetchAndProcessRegistryData(ctx, mcpRegistry)
if err != nil {
- return ctrl.Result{RequeueAfter: time.Minute * 5}, nil, err
+ return ctrl.Result{RequeueAfter: DefaultSyncRequeueAfter}, nil, err
}
// Store the processed registry data
if err := s.storeRegistryData(ctx, mcpRegistry, fetchResult); err != nil {
- return ctrl.Result{RequeueAfter: time.Minute * 5}, nil, err
+ return ctrl.Result{RequeueAfter: DefaultSyncRequeueAfter}, nil, err
}
// Update the core registry fields that sync manager owns
@@ -271,64 +302,33 @@ func (s *DefaultSyncManager) Delete(ctx context.Context, mcpRegistry *mcpv1alpha
return s.storageManager.Delete(ctx, mcpRegistry)
}
-// updatePhase updates the MCPRegistry phase and message
-func (s *DefaultSyncManager) updatePhase(ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry,
- phase mcpv1alpha1.MCPRegistryPhase, message string) error {
- mcpRegistry.Status.Phase = phase
- mcpRegistry.Status.Message = message
- return s.client.Status().Update(ctx, mcpRegistry)
-}
-
-// updatePhaseFailedWithCondition updates phase, message and sets a condition
-func (s *DefaultSyncManager) updatePhaseFailedWithCondition(ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry,
- message string, conditionType string, reason, conditionMessage string) error {
-
- // Refresh object to get latest resourceVersion
- if err := s.client.Get(ctx, client.ObjectKeyFromObject(mcpRegistry), mcpRegistry); err != nil {
- return err
- }
-
- mcpRegistry.Status.Phase = mcpv1alpha1.MCPRegistryPhaseFailed
- mcpRegistry.Status.Message = message
-
- // Set condition
- meta.SetStatusCondition(&mcpRegistry.Status.Conditions, metav1.Condition{
- Type: conditionType,
- Status: metav1.ConditionFalse,
- Reason: reason,
- Message: conditionMessage,
- })
-
- return s.client.Status().Update(ctx, mcpRegistry)
-}
-
// fetchAndProcessRegistryData handles source handler creation, validation, fetch, and filtering
func (s *DefaultSyncManager) fetchAndProcessRegistryData(
ctx context.Context,
- mcpRegistry *mcpv1alpha1.MCPRegistry) (*sources.FetchResult, error) {
+ mcpRegistry *mcpv1alpha1.MCPRegistry) (*sources.FetchResult, *mcpregistrystatus.Error) {
ctxLogger := log.FromContext(ctx)
// Get source handler
sourceHandler, err := s.sourceHandlerFactory.CreateHandler(mcpRegistry.Spec.Source.Type)
if err != nil {
ctxLogger.Error(err, "Failed to create source handler")
- if updateErr := s.updatePhaseFailedWithCondition(ctx, mcpRegistry,
- fmt.Sprintf("Failed to create source handler: %v", err),
- mcpv1alpha1.ConditionSourceAvailable, conditionReasonHandlerCreationFailed, err.Error()); updateErr != nil {
- ctxLogger.Error(updateErr, "Failed to update status after handler creation failure")
+ return nil, &mcpregistrystatus.Error{
+ Err: err,
+ Message: fmt.Sprintf("Failed to create source handler: %v", err),
+ ConditionType: mcpv1alpha1.ConditionSourceAvailable,
+ ConditionReason: conditionReasonHandlerCreationFailed,
}
- return nil, err
}
// Validate source configuration
if err := sourceHandler.Validate(&mcpRegistry.Spec.Source); err != nil {
ctxLogger.Error(err, "Source validation failed")
- if updateErr := s.updatePhaseFailedWithCondition(ctx, mcpRegistry,
- fmt.Sprintf("Source validation failed: %v", err),
- mcpv1alpha1.ConditionSourceAvailable, conditionReasonValidationFailed, err.Error()); updateErr != nil {
- ctxLogger.Error(updateErr, "Failed to update status after validation failure")
+ return nil, &mcpregistrystatus.Error{
+ Err: err,
+ Message: fmt.Sprintf("Source validation failed: %v", err),
+ ConditionType: mcpv1alpha1.ConditionSourceAvailable,
+ ConditionReason: conditionReasonValidationFailed,
}
- return nil, err
}
// Execute fetch operation
@@ -336,12 +336,12 @@ func (s *DefaultSyncManager) fetchAndProcessRegistryData(
if err != nil {
ctxLogger.Error(err, "Fetch operation failed")
// Sync attempt counting is now handled by the controller via status collector
- if updateErr := s.updatePhaseFailedWithCondition(ctx, mcpRegistry,
- fmt.Sprintf("Fetch failed: %v", err),
- mcpv1alpha1.ConditionSyncSuccessful, conditionReasonFetchFailed, err.Error()); updateErr != nil {
- ctxLogger.Error(updateErr, "Failed to update status after fetch failure")
+ return nil, &mcpregistrystatus.Error{
+ Err: err,
+ Message: fmt.Sprintf("Fetch failed: %v", err),
+ ConditionType: mcpv1alpha1.ConditionSyncSuccessful,
+ ConditionReason: conditionReasonFetchFailed,
}
- return nil, err
}
ctxLogger.Info("Registry data fetched successfully from source",
@@ -361,7 +361,7 @@ func (s *DefaultSyncManager) fetchAndProcessRegistryData(
func (s *DefaultSyncManager) applyFilteringIfConfigured(
ctx context.Context,
mcpRegistry *mcpv1alpha1.MCPRegistry,
- fetchResult *sources.FetchResult) error {
+ fetchResult *sources.FetchResult) *mcpregistrystatus.Error {
ctxLogger := log.FromContext(ctx)
if mcpRegistry.Spec.Filter != nil {
@@ -372,12 +372,12 @@ func (s *DefaultSyncManager) applyFilteringIfConfigured(
filteredRegistry, err := s.filterService.ApplyFilters(ctx, fetchResult.Registry, mcpRegistry.Spec.Filter)
if err != nil {
ctxLogger.Error(err, "Registry filtering failed")
- if updateErr := s.updatePhaseFailedWithCondition(ctx, mcpRegistry,
- fmt.Sprintf("Filtering failed: %v", err),
- mcpv1alpha1.ConditionSyncSuccessful, conditionReasonFetchFailed, err.Error()); updateErr != nil {
- ctxLogger.Error(updateErr, "Failed to update status after filtering failure")
+ return &mcpregistrystatus.Error{
+ Err: err,
+ Message: fmt.Sprintf("Filtering failed: %v", err),
+ ConditionType: mcpv1alpha1.ConditionSyncSuccessful,
+ ConditionReason: conditionReasonFetchFailed,
}
- return err
}
// Update fetch result with filtered data
@@ -400,17 +400,17 @@ func (s *DefaultSyncManager) applyFilteringIfConfigured(
func (s *DefaultSyncManager) storeRegistryData(
ctx context.Context,
mcpRegistry *mcpv1alpha1.MCPRegistry,
- fetchResult *sources.FetchResult) error {
+ fetchResult *sources.FetchResult) *mcpregistrystatus.Error {
ctxLogger := log.FromContext(ctx)
if err := s.storageManager.Store(ctx, mcpRegistry, fetchResult.Registry); err != nil {
ctxLogger.Error(err, "Failed to store registry data")
- if updateErr := s.updatePhaseFailedWithCondition(ctx, mcpRegistry,
- fmt.Sprintf("Storage failed: %v", err),
- mcpv1alpha1.ConditionSyncSuccessful, conditionReasonStorageFailed, err.Error()); updateErr != nil {
- ctxLogger.Error(updateErr, "Failed to update status after storage failure")
+ return &mcpregistrystatus.Error{
+ Err: err,
+ Message: fmt.Sprintf("Storage failed: %v", err),
+ ConditionType: mcpv1alpha1.ConditionSyncSuccessful,
+ ConditionReason: conditionReasonStorageFailed,
}
- return err
}
ctxLogger.Info("Registry data stored successfully",
@@ -425,13 +425,18 @@ func (s *DefaultSyncManager) storeRegistryData(
func (s *DefaultSyncManager) updateCoreRegistryFields(
ctx context.Context,
mcpRegistry *mcpv1alpha1.MCPRegistry,
- fetchResult *sources.FetchResult) error {
+ fetchResult *sources.FetchResult) *mcpregistrystatus.Error {
ctxLogger := log.FromContext(ctx)
// Refresh the object to get latest resourceVersion before final update
if err := s.client.Get(ctx, client.ObjectKeyFromObject(mcpRegistry), mcpRegistry); err != nil {
ctxLogger.Error(err, "Failed to refresh MCPRegistry object")
- return err
+ return &mcpregistrystatus.Error{
+ Err: err,
+ Message: fmt.Sprintf("Failed to refresh MCPRegistry object: %v", err),
+ ConditionType: mcpv1alpha1.ConditionSyncSuccessful,
+ ConditionReason: "ObjectRefreshFailed",
+ }
}
// Get storage reference
@@ -453,7 +458,12 @@ func (s *DefaultSyncManager) updateCoreRegistryFields(
// Single final status update
if err := s.client.Status().Update(ctx, mcpRegistry); err != nil {
ctxLogger.Error(err, "Failed to update core registry fields")
- return err
+ return &mcpregistrystatus.Error{
+ Err: err,
+ Message: fmt.Sprintf("Failed to update core registry fields: %v", err),
+ ConditionType: mcpv1alpha1.ConditionSyncSuccessful,
+ ConditionReason: "StatusUpdateFailed",
+ }
}
ctxLogger.Info("MCPRegistry sync completed successfully",
diff --git a/cmd/thv-operator/pkg/sync/manager_additional_test.go b/cmd/thv-operator/pkg/sync/manager_additional_test.go
index a97362ec5..c78eed19d 100644
--- a/cmd/thv-operator/pkg/sync/manager_additional_test.go
+++ b/cmd/thv-operator/pkg/sync/manager_additional_test.go
@@ -54,18 +54,6 @@ func TestDefaultSyncManager_isSyncNeededForState(t *testing.T) {
expected: false,
description: "Should not need sync when sync phase is complete",
},
- {
- name: "sync not needed when sync status is idle",
- mcpRegistry: &mcpv1alpha1.MCPRegistry{
- Status: mcpv1alpha1.MCPRegistryStatus{
- SyncStatus: &mcpv1alpha1.SyncStatus{
- Phase: mcpv1alpha1.SyncPhaseIdle,
- },
- },
- },
- expected: false,
- description: "Should not need sync when sync phase is idle",
- },
{
name: "sync needed when no sync status and overall phase is failed",
mcpRegistry: &mcpv1alpha1.MCPRegistry{
@@ -208,7 +196,7 @@ func TestDefaultSyncManager_isSyncNeededForState_EdgeCases(t *testing.T) {
},
}
result := manager.isSyncNeededForState(registry)
- // Empty phase is treated as needing sync since it's not complete or idle
+ // Empty phase is treated as needing sync since it's not complete
assert.True(t, result, "Should need sync for empty sync phase")
})
}
diff --git a/cmd/thv-operator/pkg/sync/manager_test.go b/cmd/thv-operator/pkg/sync/manager_test.go
index 3e475e12c..b4764c87a 100644
--- a/cmd/thv-operator/pkg/sync/manager_test.go
+++ b/cmd/thv-operator/pkg/sync/manager_test.go
@@ -194,7 +194,7 @@ func TestDefaultSyncManager_ShouldSync(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
- syncNeeded, reason, nextSyncTime, err := syncManager.ShouldSync(ctx, tt.mcpRegistry)
+ syncNeeded, reason, nextSyncTime := syncManager.ShouldSync(ctx, tt.mcpRegistry)
// We expect some errors for ConfigMap not found, but that's okay for this test
if tt.expectedSyncNeeded {
@@ -203,7 +203,6 @@ func TestDefaultSyncManager_ShouldSync(t *testing.T) {
} else {
assert.False(t, syncNeeded, "Expected sync not to be needed")
assert.Equal(t, tt.expectedReason, reason, "Expected specific sync reason")
- assert.NoError(t, err, "Should not have error when sync not needed")
}
if tt.expectedNextTime {
@@ -294,9 +293,9 @@ func TestDefaultSyncManager_PerformSync(t *testing.T) {
},
},
sourceConfigMap: nil,
- expectedError: true, // PerformSync now returns errors for controller to handle
- expectedPhase: mcpv1alpha1.MCPRegistryPhaseFailed,
- expectedServerCount: nil, // Don't validate server count for failed sync
+ expectedError: true, // PerformSync now returns errors for controller to handle
+ expectedPhase: mcpv1alpha1.MCPRegistryPhasePending, // Phase is not changed by PerformSync, only by controller
+ expectedServerCount: nil, // Don't validate server count for failed sync
errorContains: "",
validateConditions: false,
},
@@ -495,15 +494,15 @@ func TestDefaultSyncManager_PerformSync(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
- result, syncResult, err := syncManager.PerformSync(ctx, tt.mcpRegistry)
+ result, syncResult, syncErr := syncManager.PerformSync(ctx, tt.mcpRegistry)
if tt.expectedError {
- assert.Error(t, err)
+ assert.NotNil(t, syncErr)
if tt.errorContains != "" {
- assert.Contains(t, err.Error(), tt.errorContains)
+ assert.Contains(t, syncErr.Error(), tt.errorContains)
}
} else {
- assert.NoError(t, err)
+ assert.Nil(t, syncErr)
}
// Verify the result
@@ -715,99 +714,6 @@ func TestDefaultSyncManager_Delete(t *testing.T) {
}
}
-func TestDefaultSyncManager_updatePhase(t *testing.T) {
- t.Parallel()
-
- scheme := runtime.NewScheme()
- require.NoError(t, mcpv1alpha1.AddToScheme(scheme))
-
- mcpRegistry := &mcpv1alpha1.MCPRegistry{
- ObjectMeta: metav1.ObjectMeta{
- Name: "test-registry",
- Namespace: "test-namespace",
- UID: types.UID("test-uid"),
- },
- Status: mcpv1alpha1.MCPRegistryStatus{
- Phase: mcpv1alpha1.MCPRegistryPhasePending,
- },
- }
-
- fakeClient := fake.NewClientBuilder().
- WithScheme(scheme).
- WithRuntimeObjects(mcpRegistry).
- WithStatusSubresource(&mcpv1alpha1.MCPRegistry{}).
- Build()
-
- sourceHandlerFactory := sources.NewSourceHandlerFactory(fakeClient)
- storageManager := sources.NewConfigMapStorageManager(fakeClient, scheme)
- syncManager := NewDefaultSyncManager(fakeClient, scheme, sourceHandlerFactory, storageManager)
-
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
-
- err := syncManager.updatePhase(ctx, mcpRegistry, mcpv1alpha1.MCPRegistryPhaseSyncing, "Test message")
- assert.NoError(t, err)
-
- // Verify the phase was updated - check the modified object directly
- // since the method modifies in place
- assert.Equal(t, mcpv1alpha1.MCPRegistryPhaseSyncing, mcpRegistry.Status.Phase)
- assert.Equal(t, "Test message", mcpRegistry.Status.Message)
-}
-
-func TestDefaultSyncManager_updatePhaseFailedWithCondition(t *testing.T) {
- t.Parallel()
-
- scheme := runtime.NewScheme()
- require.NoError(t, mcpv1alpha1.AddToScheme(scheme))
-
- mcpRegistry := &mcpv1alpha1.MCPRegistry{
- ObjectMeta: metav1.ObjectMeta{
- Name: "test-registry",
- Namespace: "test-namespace",
- UID: types.UID("test-uid"),
- },
- Status: mcpv1alpha1.MCPRegistryStatus{
- Phase: mcpv1alpha1.MCPRegistryPhasePending,
- },
- }
-
- fakeClient := fake.NewClientBuilder().
- WithScheme(scheme).
- WithRuntimeObjects(mcpRegistry).
- WithStatusSubresource(&mcpv1alpha1.MCPRegistry{}).
- Build()
-
- sourceHandlerFactory := sources.NewSourceHandlerFactory(fakeClient)
- storageManager := sources.NewConfigMapStorageManager(fakeClient, scheme)
- syncManager := NewDefaultSyncManager(fakeClient, scheme, sourceHandlerFactory, storageManager)
-
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
-
- err := syncManager.updatePhaseFailedWithCondition(
- ctx,
- mcpRegistry,
- "Test failure message",
- mcpv1alpha1.ConditionSourceAvailable,
- "TestFailure",
- "Test condition message",
- )
- assert.NoError(t, err)
-
- // Verify the phase and condition were updated - check the modified object directly
- // since the method modifies in place after refreshing from client
- assert.Equal(t, mcpv1alpha1.MCPRegistryPhaseFailed, mcpRegistry.Status.Phase)
- assert.Equal(t, "Test failure message", mcpRegistry.Status.Message)
-
- // Check condition was set
- require.Len(t, mcpRegistry.Status.Conditions, 1)
- condition := mcpRegistry.Status.Conditions[0]
- assert.Equal(t, mcpv1alpha1.ConditionSourceAvailable, condition.Type)
- assert.Equal(t, metav1.ConditionFalse, condition.Status)
- assert.Equal(t, "TestFailure", condition.Reason)
- assert.Equal(t, "Test condition message", condition.Message)
-}
-
func TestIsManualSync(t *testing.T) {
t.Parallel()
diff --git a/cmd/thv-operator/thv-operator b/cmd/thv-operator/thv-operator
new file mode 100755
index 000000000..f6ee69447
Binary files /dev/null and b/cmd/thv-operator/thv-operator differ
diff --git a/deploy/charts/operator-crds/Chart.yaml b/deploy/charts/operator-crds/Chart.yaml
index dc9b59297..576a86706 100644
--- a/deploy/charts/operator-crds/Chart.yaml
+++ b/deploy/charts/operator-crds/Chart.yaml
@@ -2,5 +2,5 @@ apiVersion: v2
name: toolhive-operator-crds
description: A Helm chart for installing the ToolHive Operator CRDs into Kubernetes.
type: application
-version: 0.0.30
+version: 0.0.31
appVersion: "0.0.1"
diff --git a/deploy/charts/operator-crds/README.md b/deploy/charts/operator-crds/README.md
index 87dc98e92..79394a13b 100644
--- a/deploy/charts/operator-crds/README.md
+++ b/deploy/charts/operator-crds/README.md
@@ -1,7 +1,7 @@
# ToolHive Operator CRDs Helm Chart
-
+

A Helm chart for installing the ToolHive Operator CRDs into Kubernetes.
diff --git a/deploy/charts/operator-crds/crds/toolhive.stacklok.dev_mcpregistries.yaml b/deploy/charts/operator-crds/crds/toolhive.stacklok.dev_mcpregistries.yaml
index 92f202439..82fdbca38 100644
--- a/deploy/charts/operator-crds/crds/toolhive.stacklok.dev_mcpregistries.yaml
+++ b/deploy/charts/operator-crds/crds/toolhive.stacklok.dev_mcpregistries.yaml
@@ -373,12 +373,10 @@ spec:
phase:
allOf:
- enum:
- - Idle
- Syncing
- Complete
- Failed
- enum:
- - Idle
- Syncing
- Complete
- Failed
diff --git a/docs/operator/crd-api.md b/docs/operator/crd-api.md
index e637d3dcf..a75ee5b2b 100644
--- a/docs/operator/crd-api.md
+++ b/docs/operator/crd-api.md
@@ -877,14 +877,13 @@ _Underlying type:_ _string_
SyncPhase represents the data synchronization state
_Validation:_
-- Enum: [Idle Syncing Complete Failed]
+- Enum: [Syncing Complete Failed]
_Appears in:_
- [SyncStatus](#syncstatus)
| Field | Description |
| --- | --- |
-| `Idle` | SyncPhaseIdle means no sync is needed or scheduled
|
| `Syncing` | SyncPhaseSyncing means sync is currently in progress
|
| `Complete` | SyncPhaseComplete means sync completed successfully
|
| `Failed` | SyncPhaseFailed means sync failed
|
@@ -922,7 +921,7 @@ _Appears in:_
| Field | Description | Default | Validation |
| --- | --- | --- | --- |
-| `phase` _[SyncPhase](#syncphase)_ | Phase represents the current synchronization phase | | Enum: [Idle Syncing Complete Failed]
|
+| `phase` _[SyncPhase](#syncphase)_ | Phase represents the current synchronization phase | | Enum: [Syncing Complete Failed]
|
| `message` _string_ | Message provides additional information about the sync status | | |
| `lastAttempt` _[Time](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#time-v1-meta)_ | LastAttempt is the timestamp of the last sync attempt | | |
| `attemptCount` _integer_ | AttemptCount is the number of sync attempts since last success | | Minimum: 0
|