From f5f8a58b6fd69b82015dcd72521a51fc0f287d89 Mon Sep 17 00:00:00 2001 From: ontave Date: Thu, 7 May 2026 07:33:06 +0200 Subject: [PATCH 1/2] feat: k8s version drift remediation in DriftSignalReconciler When a drift-k8s-version-{cluster} DriftSignal arrives (emitted by KubernetesVersionDriftLoop on the tenant conductor), create a corrective UpgradePolicy (type=kubernetes, targetKubernetesVersion=spec.kubernetesVersion) in seam-tenant-{cluster}. UpgradePolicyReconciler picks it up and submits a kube-upgrade executor Job to bring the cluster back to declared state. Routing: InfrastructureTalosCluster signals are now distinguished by name prefix -- drift-k8s-version-* routes to handleKubernetesVersionDrift, all others continue to handleTalosVersionDrift. 1 unit test: TestDriftSignalReconciler_K8sVersionDrift_CreatesUpgradePolicy. All 7 DriftSignal unit tests pass. --- internal/controller/driftsignal_reconciler.go | 63 ++++++++++++++- .../controller/driftsignal_reconciler_test.go | 79 +++++++++++++++++++ 2 files changed, 138 insertions(+), 4 deletions(-) diff --git a/internal/controller/driftsignal_reconciler.go b/internal/controller/driftsignal_reconciler.go index b607e67..dbc656b 100644 --- a/internal/controller/driftsignal_reconciler.go +++ b/internal/controller/driftsignal_reconciler.go @@ -21,14 +21,17 @@ import ( // DriftSignalReconciler handles cluster-state DriftSignals written by conductor role=tenant. // -// Two signal kinds are handled: +// Three signal kinds are handled: // // - InfrastructureRunnerConfig (T-23): conductor detected RunnerConfig persistently absent. // Response: annotate TalosCluster to trigger RunnerConfig recreation. // -// - InfrastructureTalosCluster: conductor detected Talos version drift (out-of-band upgrade -// on the tenant cluster). Response: patch TalosCluster.status.observedTalosVersion, -// write a synthetic out-of-band TCOR record, bump TCOR revision epoch to observed version. +// - InfrastructureTalosCluster (name prefix "drift-version-"): Talos OS version drift. +// Response: patch TalosCluster.status.observedTalosVersion, write out-of-band TCOR +// record, bump TCOR epoch, create corrective UpgradePolicy (type=talos). +// +// - InfrastructureTalosCluster (name prefix "drift-k8s-version-"): Kubernetes version drift. +// Response: create corrective UpgradePolicy (type=kubernetes) targeting spec.kubernetesVersion. // // conductor DriftSignalHandler skips InfrastructureTalosCluster kind signals; they are // owned exclusively by this reconciler. @@ -66,6 +69,9 @@ func (r *DriftSignalReconciler) Reconcile(ctx context.Context, req ctrl.Request) case "InfrastructureRunnerConfig": return r.handleRunnerConfigDrift(ctx, log, ds, clusterName) case "InfrastructureTalosCluster": + if strings.HasPrefix(ds.Name, "drift-k8s-version-") { + return r.handleKubernetesVersionDrift(ctx, log, ds, clusterName) + } return r.handleTalosVersionDrift(ctx, log, ds, clusterName) default: // Other kinds are handled by conductor DriftSignalHandler (pack drift). @@ -264,6 +270,55 @@ func (r *DriftSignalReconciler) ensureCorrectiveUpgradePolicy(ctx context.Contex return nil } +// handleKubernetesVersionDrift handles DriftSignals emitted by KubernetesVersionDriftLoop. +// It creates a corrective UpgradePolicy (type=kubernetes) targeting the declared +// spec.kubernetesVersion so UpgradePolicyReconciler can submit a kube-upgrade executor Job. +func (r *DriftSignalReconciler) handleKubernetesVersionDrift(ctx context.Context, log logr.Logger, ds *seamcorev1alpha1.DriftSignal, clusterName string) (ctrl.Result, error) { + log.Info("handling Kubernetes version drift", + "cluster", clusterName, "driftReason", ds.Spec.DriftReason) + + tc, err := r.getTalosCluster(ctx, clusterName) + if err != nil { + return ctrl.Result{}, err + } + if tc == nil { + log.Info("TalosCluster not found -- marking queued to stop retries", "cluster", clusterName) + return ctrl.Result{}, r.advanceDriftSignalToQueued(ctx, ds) + } + + if err := r.ensureCorrectiveKubeUpgradePolicy(ctx, clusterName, tc.Spec.KubernetesVersion); err != nil { + return ctrl.Result{}, fmt.Errorf("DriftSignalReconciler: ensure corrective kube UpgradePolicy %s: %w", clusterName, err) + } + log.Info("corrective kube UpgradePolicy ensured", + "cluster", clusterName, "targetVersion", tc.Spec.KubernetesVersion) + + return ctrl.Result{}, r.advanceDriftSignalToQueued(ctx, ds) +} + +// ensureCorrectiveKubeUpgradePolicy creates an UpgradePolicy in seam-tenant-{cluster} to +// bring the cluster back to specVersion (the declared spec.kubernetesVersion). Idempotent. +func (r *DriftSignalReconciler) ensureCorrectiveKubeUpgradePolicy(ctx context.Context, clusterName, specVersion string) error { + ns := tenantNS(clusterName) + up := &platformv1alpha1.UpgradePolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "drift-k8s-version-" + clusterName, + Namespace: ns, + }, + Spec: platformv1alpha1.UpgradePolicySpec{ + ClusterRef: platformv1alpha1.LocalObjectRef{ + Name: clusterName, + Namespace: rbacProfileNamespace, + }, + UpgradeType: platformv1alpha1.UpgradeTypeKubernetes, + TargetKubernetesVersion: specVersion, + }, + } + if err := r.Client.Create(ctx, up); err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("create UpgradePolicy drift-k8s-version-%s: %w", clusterName, err) + } + return nil +} + // extractObservedVersion parses the observed talos version from a driftReason string // produced by TalosVersionDriftLoop. Format: "talos version drift: spec={x} observed={y}". func extractObservedVersion(driftReason string) string { diff --git a/internal/controller/driftsignal_reconciler_test.go b/internal/controller/driftsignal_reconciler_test.go index 5963df0..d269ec1 100644 --- a/internal/controller/driftsignal_reconciler_test.go +++ b/internal/controller/driftsignal_reconciler_test.go @@ -335,6 +335,85 @@ func TestDriftSignalReconciler_TalosVersionDrift_FullFlow(t *testing.T) { } } +// TestDriftSignalReconciler_K8sVersionDrift_CreatesUpgradePolicy verifies that a pending +// DriftSignal named "drift-k8s-version-{cluster}" with kind=InfrastructureTalosCluster causes: +// - A corrective UpgradePolicy (type=kubernetes) targeting spec.kubernetesVersion +// - The DriftSignal advanced to queued +func TestDriftSignalReconciler_K8sVersionDrift_CreatesUpgradePolicy(t *testing.T) { + scheme := buildDriftSignalTestScheme(t) + if err := platformv1alpha1.AddToScheme(scheme); err != nil { + t.Fatalf("add platform scheme: %v", err) + } + + clusterName := "ccs-dev" + tenantNSName := tenantNS(clusterName) + signalName := "drift-k8s-version-" + clusterName + + ds := &seamcorev1alpha1.DriftSignal{ + ObjectMeta: metav1.ObjectMeta{ + Name: signalName, Namespace: tenantNSName, ResourceVersion: "1", + }, + Spec: seamcorev1alpha1.DriftSignalSpec{ + State: seamcorev1alpha1.DriftSignalStatePending, + CorrelationID: "k8s-version-ccs-dev-123", + ObservedAt: metav1.Now(), + AffectedCRRef: seamcorev1alpha1.DriftAffectedCRRef{ + Group: "infrastructure.ontai.dev", + Kind: "InfrastructureTalosCluster", + Name: clusterName, + }, + DriftReason: "kubernetes version drift: spec=1.32.2 observed=1.32.3", + }, + } + + tc := fakeTalosClusterForDrift(clusterName) + tc.Spec.KubernetesVersion = "1.32.2" + + tenantNamespaceObj := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: tenantNSName}, + } + + c := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(ds, tc, tenantNamespaceObj). + WithStatusSubresource(&seamcorev1alpha1.DriftSignal{}). + Build() + + r := &DriftSignalReconciler{Client: c} + + _, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: signalName, Namespace: tenantNSName}, + }) + if err != nil { + t.Fatalf("Reconcile: %v", err) + } + + // UpgradePolicy must be created with type=kubernetes targeting spec.kubernetesVersion. + gotUP := &platformv1alpha1.UpgradePolicy{} + if err := c.Get(context.Background(), types.NamespacedName{ + Name: signalName, Namespace: tenantNSName, + }, gotUP); err != nil { + t.Fatalf("get corrective kube UpgradePolicy: %v", err) + } + if gotUP.Spec.UpgradeType != platformv1alpha1.UpgradeTypeKubernetes { + t.Errorf("UpgradePolicy.Spec.UpgradeType = %q, want %q", + gotUP.Spec.UpgradeType, platformv1alpha1.UpgradeTypeKubernetes) + } + if gotUP.Spec.TargetKubernetesVersion != "1.32.2" { + t.Errorf("UpgradePolicy.Spec.TargetKubernetesVersion = %q, want 1.32.2", + gotUP.Spec.TargetKubernetesVersion) + } + + // DriftSignal must be advanced to queued. + gotDS := &seamcorev1alpha1.DriftSignal{} + if err := c.Get(context.Background(), types.NamespacedName{Name: signalName, Namespace: tenantNSName}, gotDS); err != nil { + t.Fatalf("get DriftSignal: %v", err) + } + if gotDS.Spec.State != seamcorev1alpha1.DriftSignalStateQueued { + t.Errorf("DriftSignal.Spec.State = %q, want queued", gotDS.Spec.State) + } +} + // TestDriftSignalReconciler_TalosVersionDrift_NoParsableVersion_AdvancesToQueued verifies // that a version drift signal without a parseable observed version is still advanced to queued // (does not retry indefinitely). From 7e26205007e50f86f906446736380eaf887af06e Mon Sep 17 00:00:00 2001 From: ontave Date: Thu, 7 May 2026 08:07:14 +0200 Subject: [PATCH 2/2] feat: type-aware spec.versionUpgrade -- kubernetes and stack upgrade paths reconcileVersionUpgrade now derives UpgradePolicy type from which version fields are set: talosVersion only -> UpgradeTypeTalos (existing), kubernetesVersion only -> UpgradeTypeKubernetes, both -> UpgradeTypeStack. Two new unit tests: TestTalosCluster_VersionUpgrade_KubernetesOnly_CreatesKubePolicy and TestTalosCluster_VersionUpgrade_Stack_CreatesBothVersions. All 8 version upgrade tests pass. --- .../taloscluster_version_upgrade.go | 68 ++++++---- .../taloscluster_versionupgrade_test.go | 124 ++++++++++++++++++ 2 files changed, 170 insertions(+), 22 deletions(-) diff --git a/internal/controller/taloscluster_version_upgrade.go b/internal/controller/taloscluster_version_upgrade.go index 9d85904..fe03a00 100644 --- a/internal/controller/taloscluster_version_upgrade.go +++ b/internal/controller/taloscluster_version_upgrade.go @@ -5,10 +5,13 @@ package controller // // Version upgrade path: // - spec.versionUpgrade=true on a Ready cluster auto-creates an UpgradePolicy CR. +// - Upgrade type derives from which version fields are set: +// talosVersion only → UpgradeTypeTalos; kubernetesVersion only → UpgradeTypeKubernetes; +// both → UpgradeTypeStack (sequential Talos then k8s). // - The UpgradePolicy reconciler drives the Conductor Job. // - On completion, UpgradePolicy reconciler patches status.observedTalosVersion. -// - TalosClusterReconciler detects UpgradePolicy Ready=True and clears -// spec.versionUpgrade via spec patch, setting VersionUpgradePending=False. +// - TalosClusterReconciler detects UpgradePolicy Ready=True and sets +// VersionUpgradePending=False. // // Anti-regression: // - If spec.talosVersion < status.observedTalosVersion, the reconciler sets @@ -109,24 +112,39 @@ func (r *TalosClusterReconciler) reconcileVersionUpgrade(ctx context.Context, tc return false, ctrl.Result{}, nil } - // spec.versionUpgrade=true: validate that talosVersion is set. - if tc.Spec.TalosVersion == "" { + // Determine which version fields are set. + hasTalos := tc.Spec.TalosVersion != "" + hasKube := tc.Spec.KubernetesVersion != "" + + // At least one target version must be present. + if !hasTalos && !hasKube { platformv1alpha1.SetCondition( &tc.Status.Conditions, platformv1alpha1.ConditionTypePhaseFailed, metav1.ConditionTrue, platformv1alpha1.ReasonTalosVersionRequired, - "spec.versionUpgrade=true requires spec.talosVersion to be set to the target version.", + "spec.versionUpgrade=true requires spec.talosVersion, spec.kubernetesVersion, or both.", tc.Generation, ) return true, ctrl.Result{}, nil } - // Anti-regression: if the specified version would downgrade, block. - if checkVersionRegression(tc) { + // Anti-regression guard applies only when a Talos version change is requested. + if hasTalos && checkVersionRegression(tc) { return true, ctrl.Result{}, nil } + // Derive upgrade type from which fields are populated. + var upgradeType platformv1alpha1.UpgradeType + switch { + case hasTalos && hasKube: + upgradeType = platformv1alpha1.UpgradeTypeStack + case hasTalos: + upgradeType = platformv1alpha1.UpgradeTypeTalos + default: + upgradeType = platformv1alpha1.UpgradeTypeKubernetes + } + upName := tc.Name + versionUpgradeSuffix // Check if the UpgradePolicy already exists. @@ -137,7 +155,17 @@ func (r *TalosClusterReconciler) reconcileVersionUpgrade(ctx context.Context, tc } if apierrors.IsNotFound(err) { - // Create the UpgradePolicy. + upSpec := platformv1alpha1.UpgradePolicySpec{ + ClusterRef: platformv1alpha1.LocalObjectRef{Name: tc.Name, Namespace: tc.Namespace}, + UpgradeType: upgradeType, + RollingStrategy: platformv1alpha1.RollingStrategySequential, + } + if hasTalos { + upSpec.TargetTalosVersion = tc.Spec.TalosVersion + } + if hasKube { + upSpec.TargetKubernetesVersion = tc.Spec.KubernetesVersion + } up := &platformv1alpha1.UpgradePolicy{ ObjectMeta: metav1.ObjectMeta{ Name: upName, @@ -147,29 +175,26 @@ func (r *TalosClusterReconciler) reconcileVersionUpgrade(ctx context.Context, tc "platform.ontai.dev/cluster": tc.Name, }, }, - Spec: platformv1alpha1.UpgradePolicySpec{ - ClusterRef: platformv1alpha1.LocalObjectRef{Name: tc.Name, Namespace: tc.Namespace}, - UpgradeType: platformv1alpha1.UpgradeTypeTalos, - TargetTalosVersion: tc.Spec.TalosVersion, - RollingStrategy: platformv1alpha1.RollingStrategySequential, - }, + Spec: upSpec, } if err := r.Client.Create(ctx, up); err != nil { return true, ctrl.Result{}, fmt.Errorf("reconcileVersionUpgrade: create UpgradePolicy: %w", err) } + msg := fmt.Sprintf("UpgradePolicy %s created for %s upgrade (talos=%s kubernetes=%s).", + upName, upgradeType, tc.Spec.TalosVersion, tc.Spec.KubernetesVersion) platformv1alpha1.SetCondition( &tc.Status.Conditions, platformv1alpha1.ConditionTypeVersionUpgradePending, metav1.ConditionTrue, platformv1alpha1.ReasonVersionUpgradeSubmitted, - fmt.Sprintf("UpgradePolicy %s created for Talos version upgrade to %s.", upName, tc.Spec.TalosVersion), + msg, tc.Generation, ) r.Recorder.Eventf(tc, nil, "Normal", "VersionUpgradeSubmitted", "VersionUpgradeSubmitted", - "Created UpgradePolicy %s to upgrade cluster %s to Talos %s", - upName, tc.Name, tc.Spec.TalosVersion) + "Created UpgradePolicy %s for cluster %s (%s)", upName, tc.Name, upgradeType) logger.Info("created UpgradePolicy for spec.versionUpgrade", - "cluster", tc.Name, "upgradePolicyName", upName, "targetVersion", tc.Spec.TalosVersion) + "cluster", tc.Name, "upgradePolicyName", upName, "upgradeType", upgradeType, + "talosVersion", tc.Spec.TalosVersion, "kubernetesVersion", tc.Spec.KubernetesVersion) return true, ctrl.Result{RequeueAfter: operationalJobPollInterval}, nil } @@ -199,13 +224,12 @@ func (r *TalosClusterReconciler) reconcileVersionUpgrade(ctx context.Context, tc platformv1alpha1.ConditionTypeVersionUpgradePending, metav1.ConditionFalse, platformv1alpha1.ReasonVersionUpgradeComplete, - fmt.Sprintf("UpgradePolicy %s completed. Cluster upgraded to Talos %s.", upName, tc.Spec.TalosVersion), + fmt.Sprintf("UpgradePolicy %s completed (%s).", upName, upgradeType), tc.Generation, ) r.Recorder.Eventf(tc, nil, "Normal", "VersionUpgradeComplete", "VersionUpgradeComplete", - "Cluster %s upgraded to Talos %s via UpgradePolicy %s", - tc.Name, tc.Spec.TalosVersion, upName) + "Cluster %s completed %s upgrade via UpgradePolicy %s", tc.Name, upgradeType, upName) logger.Info("version upgrade complete via UpgradePolicy", - "cluster", tc.Name, "version", tc.Spec.TalosVersion) + "cluster", tc.Name, "upgradeType", upgradeType) return true, ctrl.Result{}, nil } diff --git a/test/unit/controller/taloscluster_versionupgrade_test.go b/test/unit/controller/taloscluster_versionupgrade_test.go index c816dde..15de4f1 100644 --- a/test/unit/controller/taloscluster_versionupgrade_test.go +++ b/test/unit/controller/taloscluster_versionupgrade_test.go @@ -459,3 +459,127 @@ func TestTCOR_RevisionBumpedAfterUpgrade(t *testing.T) { t.Errorf("Operations len = %d after revision bump, want 0", len(tcor.Spec.Operations)) } } + +// TestTalosCluster_VersionUpgrade_KubernetesOnly_CreatesKubePolicy verifies that when +// spec.versionUpgrade=true and only spec.kubernetesVersion is set (talosVersion empty), +// the reconciler creates an UpgradePolicy with UpgradeTypeKubernetes and the correct +// TargetKubernetesVersion. TargetTalosVersion must be empty. +func TestTalosCluster_VersionUpgrade_KubernetesOnly_CreatesKubePolicy(t *testing.T) { + scheme := buildDay2Scheme(t) + // talosVersion="" means no Talos upgrade requested — kubernetesVersion drives it. + tc := buildReadyManagementCluster("ccs-mgmt", "seam-system", "", "v1.9.4") + tc.Spec.KubernetesVersion = "1.32.4" + tc.Spec.VersionUpgrade = true + + c := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(tc). + WithStatusSubresource(tc). + Build() + r := &controller.TalosClusterReconciler{ + Client: c, + Scheme: scheme, + Recorder: clientevents.NewFakeRecorder(32), + } + + result, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "ccs-mgmt", Namespace: "seam-system"}, + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.RequeueAfter == 0 { + t.Error("expected non-zero RequeueAfter while waiting for UpgradePolicy") + } + + up := &platformv1alpha1.UpgradePolicy{} + if err := c.Get(context.Background(), types.NamespacedName{ + Name: "ccs-mgmt-version-upgrade", + Namespace: "seam-system", + }, up); err != nil { + t.Fatalf("UpgradePolicy not created: %v", err) + } + if up.Spec.UpgradeType != platformv1alpha1.UpgradeTypeKubernetes { + t.Errorf("UpgradeType = %q, want kubernetes", up.Spec.UpgradeType) + } + if up.Spec.TargetKubernetesVersion != "1.32.4" { + t.Errorf("TargetKubernetesVersion = %q, want 1.32.4", up.Spec.TargetKubernetesVersion) + } + if up.Spec.TargetTalosVersion != "" { + t.Errorf("TargetTalosVersion = %q, want empty for kubernetes-only upgrade", up.Spec.TargetTalosVersion) + } + + got := &platformv1alpha1.TalosCluster{} + if err := c.Get(context.Background(), types.NamespacedName{ + Name: "ccs-mgmt", Namespace: "seam-system", + }, got); err != nil { + t.Fatalf("get TalosCluster: %v", err) + } + cond := platformv1alpha1.FindCondition(got.Status.Conditions, platformv1alpha1.ConditionTypeVersionUpgradePending) + if cond == nil || cond.Status != metav1.ConditionTrue { + t.Fatal("VersionUpgradePending not set to True for kubernetes-only upgrade") + } +} + +// TestTalosCluster_VersionUpgrade_Stack_CreatesBothVersions verifies that when +// spec.versionUpgrade=true with both spec.talosVersion and spec.kubernetesVersion set, +// the reconciler creates an UpgradePolicy with UpgradeTypeStack carrying both target +// versions (sequential Talos then k8s upgrade). +func TestTalosCluster_VersionUpgrade_Stack_CreatesBothVersions(t *testing.T) { + scheme := buildDay2Scheme(t) + tc := buildReadyManagementCluster("ccs-mgmt", "seam-system", "v1.9.4", "v1.9.3") + tc.Spec.KubernetesVersion = "1.32.4" + tc.Spec.VersionUpgrade = true + + c := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(tc). + WithStatusSubresource(tc). + Build() + r := &controller.TalosClusterReconciler{ + Client: c, + Scheme: scheme, + Recorder: clientevents.NewFakeRecorder(32), + } + + result, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "ccs-mgmt", Namespace: "seam-system"}, + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.RequeueAfter == 0 { + t.Error("expected non-zero RequeueAfter while waiting for UpgradePolicy") + } + + up := &platformv1alpha1.UpgradePolicy{} + if err := c.Get(context.Background(), types.NamespacedName{ + Name: "ccs-mgmt-version-upgrade", + Namespace: "seam-system", + }, up); err != nil { + t.Fatalf("UpgradePolicy not created: %v", err) + } + if up.Spec.UpgradeType != platformv1alpha1.UpgradeTypeStack { + t.Errorf("UpgradeType = %q, want stack", up.Spec.UpgradeType) + } + if up.Spec.TargetTalosVersion != "v1.9.4" { + t.Errorf("TargetTalosVersion = %q, want v1.9.4", up.Spec.TargetTalosVersion) + } + if up.Spec.TargetKubernetesVersion != "1.32.4" { + t.Errorf("TargetKubernetesVersion = %q, want 1.32.4", up.Spec.TargetKubernetesVersion) + } + if up.Spec.ClusterRef.Name != "ccs-mgmt" { + t.Errorf("ClusterRef.Name = %q, want ccs-mgmt", up.Spec.ClusterRef.Name) + } + + got := &platformv1alpha1.TalosCluster{} + if err := c.Get(context.Background(), types.NamespacedName{ + Name: "ccs-mgmt", Namespace: "seam-system", + }, got); err != nil { + t.Fatalf("get TalosCluster: %v", err) + } + cond := platformv1alpha1.FindCondition(got.Status.Conditions, platformv1alpha1.ConditionTypeVersionUpgradePending) + if cond == nil || cond.Status != metav1.ConditionTrue { + t.Fatal("VersionUpgradePending not set to True for stack upgrade") + } +}