Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 59 additions & 4 deletions internal/controller/driftsignal_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@ import (

// DriftSignalReconciler handles cluster-state DriftSignals written by conductor role=tenant.
//
// Two signal kinds are handled:
// Three signal kinds are handled:
//
// - InfrastructureRunnerConfig (T-23): conductor detected RunnerConfig persistently absent.
// Response: annotate TalosCluster to trigger RunnerConfig recreation.
//
// - InfrastructureTalosCluster: conductor detected Talos version drift (out-of-band upgrade
// on the tenant cluster). Response: patch TalosCluster.status.observedTalosVersion,
// write a synthetic out-of-band TCOR record, bump TCOR revision epoch to observed version.
// - InfrastructureTalosCluster (name prefix "drift-version-"): Talos OS version drift.
// Response: patch TalosCluster.status.observedTalosVersion, write out-of-band TCOR
// record, bump TCOR epoch, create corrective UpgradePolicy (type=talos).
//
// - InfrastructureTalosCluster (name prefix "drift-k8s-version-"): Kubernetes version drift.
// Response: create corrective UpgradePolicy (type=kubernetes) targeting spec.kubernetesVersion.
//
// conductor DriftSignalHandler skips InfrastructureTalosCluster kind signals; they are
// owned exclusively by this reconciler.
Expand Down Expand Up @@ -66,6 +69,9 @@ func (r *DriftSignalReconciler) Reconcile(ctx context.Context, req ctrl.Request)
case "InfrastructureRunnerConfig":
return r.handleRunnerConfigDrift(ctx, log, ds, clusterName)
case "InfrastructureTalosCluster":
if strings.HasPrefix(ds.Name, "drift-k8s-version-") {
return r.handleKubernetesVersionDrift(ctx, log, ds, clusterName)
}
return r.handleTalosVersionDrift(ctx, log, ds, clusterName)
default:
// Other kinds are handled by conductor DriftSignalHandler (pack drift).
Expand Down Expand Up @@ -264,6 +270,55 @@ func (r *DriftSignalReconciler) ensureCorrectiveUpgradePolicy(ctx context.Contex
return nil
}

// handleKubernetesVersionDrift handles DriftSignals emitted by KubernetesVersionDriftLoop.
// It creates a corrective UpgradePolicy (type=kubernetes) targeting the declared
// spec.kubernetesVersion so UpgradePolicyReconciler can submit a kube-upgrade executor Job.
func (r *DriftSignalReconciler) handleKubernetesVersionDrift(ctx context.Context, log logr.Logger, ds *seamcorev1alpha1.DriftSignal, clusterName string) (ctrl.Result, error) {
log.Info("handling Kubernetes version drift",
"cluster", clusterName, "driftReason", ds.Spec.DriftReason)

tc, err := r.getTalosCluster(ctx, clusterName)
if err != nil {
return ctrl.Result{}, err
}
if tc == nil {
log.Info("TalosCluster not found -- marking queued to stop retries", "cluster", clusterName)
return ctrl.Result{}, r.advanceDriftSignalToQueued(ctx, ds)
}

if err := r.ensureCorrectiveKubeUpgradePolicy(ctx, clusterName, tc.Spec.KubernetesVersion); err != nil {
return ctrl.Result{}, fmt.Errorf("DriftSignalReconciler: ensure corrective kube UpgradePolicy %s: %w", clusterName, err)
}
log.Info("corrective kube UpgradePolicy ensured",
"cluster", clusterName, "targetVersion", tc.Spec.KubernetesVersion)

return ctrl.Result{}, r.advanceDriftSignalToQueued(ctx, ds)
}

// ensureCorrectiveKubeUpgradePolicy creates an UpgradePolicy in seam-tenant-{cluster} to
// bring the cluster back to specVersion (the declared spec.kubernetesVersion). Idempotent.
func (r *DriftSignalReconciler) ensureCorrectiveKubeUpgradePolicy(ctx context.Context, clusterName, specVersion string) error {
ns := tenantNS(clusterName)
up := &platformv1alpha1.UpgradePolicy{
ObjectMeta: metav1.ObjectMeta{
Name: "drift-k8s-version-" + clusterName,
Namespace: ns,
},
Spec: platformv1alpha1.UpgradePolicySpec{
ClusterRef: platformv1alpha1.LocalObjectRef{
Name: clusterName,
Namespace: rbacProfileNamespace,
},
UpgradeType: platformv1alpha1.UpgradeTypeKubernetes,
TargetKubernetesVersion: specVersion,
},
}
if err := r.Client.Create(ctx, up); err != nil && !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("create UpgradePolicy drift-k8s-version-%s: %w", clusterName, err)
}
return nil
}

// extractObservedVersion parses the observed talos version from a driftReason string
// produced by TalosVersionDriftLoop. Format: "talos version drift: spec={x} observed={y}".
func extractObservedVersion(driftReason string) string {
Expand Down
79 changes: 79 additions & 0 deletions internal/controller/driftsignal_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,85 @@ func TestDriftSignalReconciler_TalosVersionDrift_FullFlow(t *testing.T) {
}
}

// TestDriftSignalReconciler_K8sVersionDrift_CreatesUpgradePolicy verifies that a pending
// DriftSignal named "drift-k8s-version-{cluster}" with kind=InfrastructureTalosCluster causes:
// - A corrective UpgradePolicy (type=kubernetes) targeting spec.kubernetesVersion
// - The DriftSignal advanced to queued
func TestDriftSignalReconciler_K8sVersionDrift_CreatesUpgradePolicy(t *testing.T) {
scheme := buildDriftSignalTestScheme(t)
if err := platformv1alpha1.AddToScheme(scheme); err != nil {
t.Fatalf("add platform scheme: %v", err)
}

clusterName := "ccs-dev"
tenantNSName := tenantNS(clusterName)
signalName := "drift-k8s-version-" + clusterName

ds := &seamcorev1alpha1.DriftSignal{
ObjectMeta: metav1.ObjectMeta{
Name: signalName, Namespace: tenantNSName, ResourceVersion: "1",
},
Spec: seamcorev1alpha1.DriftSignalSpec{
State: seamcorev1alpha1.DriftSignalStatePending,
CorrelationID: "k8s-version-ccs-dev-123",
ObservedAt: metav1.Now(),
AffectedCRRef: seamcorev1alpha1.DriftAffectedCRRef{
Group: "infrastructure.ontai.dev",
Kind: "InfrastructureTalosCluster",
Name: clusterName,
},
DriftReason: "kubernetes version drift: spec=1.32.2 observed=1.32.3",
},
}

tc := fakeTalosClusterForDrift(clusterName)
tc.Spec.KubernetesVersion = "1.32.2"

tenantNamespaceObj := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{Name: tenantNSName},
}

c := fake.NewClientBuilder().
WithScheme(scheme).
WithObjects(ds, tc, tenantNamespaceObj).
WithStatusSubresource(&seamcorev1alpha1.DriftSignal{}).
Build()

r := &DriftSignalReconciler{Client: c}

_, err := r.Reconcile(context.Background(), ctrl.Request{
NamespacedName: types.NamespacedName{Name: signalName, Namespace: tenantNSName},
})
if err != nil {
t.Fatalf("Reconcile: %v", err)
}

// UpgradePolicy must be created with type=kubernetes targeting spec.kubernetesVersion.
gotUP := &platformv1alpha1.UpgradePolicy{}
if err := c.Get(context.Background(), types.NamespacedName{
Name: signalName, Namespace: tenantNSName,
}, gotUP); err != nil {
t.Fatalf("get corrective kube UpgradePolicy: %v", err)
}
if gotUP.Spec.UpgradeType != platformv1alpha1.UpgradeTypeKubernetes {
t.Errorf("UpgradePolicy.Spec.UpgradeType = %q, want %q",
gotUP.Spec.UpgradeType, platformv1alpha1.UpgradeTypeKubernetes)
}
if gotUP.Spec.TargetKubernetesVersion != "1.32.2" {
t.Errorf("UpgradePolicy.Spec.TargetKubernetesVersion = %q, want 1.32.2",
gotUP.Spec.TargetKubernetesVersion)
}

// DriftSignal must be advanced to queued.
gotDS := &seamcorev1alpha1.DriftSignal{}
if err := c.Get(context.Background(), types.NamespacedName{Name: signalName, Namespace: tenantNSName}, gotDS); err != nil {
t.Fatalf("get DriftSignal: %v", err)
}
if gotDS.Spec.State != seamcorev1alpha1.DriftSignalStateQueued {
t.Errorf("DriftSignal.Spec.State = %q, want queued", gotDS.Spec.State)
}
}

// TestDriftSignalReconciler_TalosVersionDrift_NoParsableVersion_AdvancesToQueued verifies
// that a version drift signal without a parseable observed version is still advanced to queued
// (does not retry indefinitely).
Expand Down
68 changes: 46 additions & 22 deletions internal/controller/taloscluster_version_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ package controller
//
// Version upgrade path:
// - spec.versionUpgrade=true on a Ready cluster auto-creates an UpgradePolicy CR.
// - Upgrade type derives from which version fields are set:
// talosVersion only → UpgradeTypeTalos; kubernetesVersion only → UpgradeTypeKubernetes;
// both → UpgradeTypeStack (sequential Talos then k8s).
// - The UpgradePolicy reconciler drives the Conductor Job.
// - On completion, UpgradePolicy reconciler patches status.observedTalosVersion.
// - TalosClusterReconciler detects UpgradePolicy Ready=True and clears
// spec.versionUpgrade via spec patch, setting VersionUpgradePending=False.
// - TalosClusterReconciler detects UpgradePolicy Ready=True and sets
// VersionUpgradePending=False.
//
// Anti-regression:
// - If spec.talosVersion < status.observedTalosVersion, the reconciler sets
Expand Down Expand Up @@ -109,24 +112,39 @@ func (r *TalosClusterReconciler) reconcileVersionUpgrade(ctx context.Context, tc
return false, ctrl.Result{}, nil
}

// spec.versionUpgrade=true: validate that talosVersion is set.
if tc.Spec.TalosVersion == "" {
// Determine which version fields are set.
hasTalos := tc.Spec.TalosVersion != ""
hasKube := tc.Spec.KubernetesVersion != ""

// At least one target version must be present.
if !hasTalos && !hasKube {
platformv1alpha1.SetCondition(
&tc.Status.Conditions,
platformv1alpha1.ConditionTypePhaseFailed,
metav1.ConditionTrue,
platformv1alpha1.ReasonTalosVersionRequired,
"spec.versionUpgrade=true requires spec.talosVersion to be set to the target version.",
"spec.versionUpgrade=true requires spec.talosVersion, spec.kubernetesVersion, or both.",
tc.Generation,
)
return true, ctrl.Result{}, nil
}

// Anti-regression: if the specified version would downgrade, block.
if checkVersionRegression(tc) {
// Anti-regression guard applies only when a Talos version change is requested.
if hasTalos && checkVersionRegression(tc) {
return true, ctrl.Result{}, nil
}

// Derive upgrade type from which fields are populated.
var upgradeType platformv1alpha1.UpgradeType
switch {
case hasTalos && hasKube:
upgradeType = platformv1alpha1.UpgradeTypeStack
case hasTalos:
upgradeType = platformv1alpha1.UpgradeTypeTalos
default:
upgradeType = platformv1alpha1.UpgradeTypeKubernetes
}

upName := tc.Name + versionUpgradeSuffix

// Check if the UpgradePolicy already exists.
Expand All @@ -137,7 +155,17 @@ func (r *TalosClusterReconciler) reconcileVersionUpgrade(ctx context.Context, tc
}

if apierrors.IsNotFound(err) {
// Create the UpgradePolicy.
upSpec := platformv1alpha1.UpgradePolicySpec{
ClusterRef: platformv1alpha1.LocalObjectRef{Name: tc.Name, Namespace: tc.Namespace},
UpgradeType: upgradeType,
RollingStrategy: platformv1alpha1.RollingStrategySequential,
}
if hasTalos {
upSpec.TargetTalosVersion = tc.Spec.TalosVersion
}
if hasKube {
upSpec.TargetKubernetesVersion = tc.Spec.KubernetesVersion
}
up := &platformv1alpha1.UpgradePolicy{
ObjectMeta: metav1.ObjectMeta{
Name: upName,
Expand All @@ -147,29 +175,26 @@ func (r *TalosClusterReconciler) reconcileVersionUpgrade(ctx context.Context, tc
"platform.ontai.dev/cluster": tc.Name,
},
},
Spec: platformv1alpha1.UpgradePolicySpec{
ClusterRef: platformv1alpha1.LocalObjectRef{Name: tc.Name, Namespace: tc.Namespace},
UpgradeType: platformv1alpha1.UpgradeTypeTalos,
TargetTalosVersion: tc.Spec.TalosVersion,
RollingStrategy: platformv1alpha1.RollingStrategySequential,
},
Spec: upSpec,
}
if err := r.Client.Create(ctx, up); err != nil {
return true, ctrl.Result{}, fmt.Errorf("reconcileVersionUpgrade: create UpgradePolicy: %w", err)
}
msg := fmt.Sprintf("UpgradePolicy %s created for %s upgrade (talos=%s kubernetes=%s).",
upName, upgradeType, tc.Spec.TalosVersion, tc.Spec.KubernetesVersion)
platformv1alpha1.SetCondition(
&tc.Status.Conditions,
platformv1alpha1.ConditionTypeVersionUpgradePending,
metav1.ConditionTrue,
platformv1alpha1.ReasonVersionUpgradeSubmitted,
fmt.Sprintf("UpgradePolicy %s created for Talos version upgrade to %s.", upName, tc.Spec.TalosVersion),
msg,
tc.Generation,
)
r.Recorder.Eventf(tc, nil, "Normal", "VersionUpgradeSubmitted", "VersionUpgradeSubmitted",
"Created UpgradePolicy %s to upgrade cluster %s to Talos %s",
upName, tc.Name, tc.Spec.TalosVersion)
"Created UpgradePolicy %s for cluster %s (%s)", upName, tc.Name, upgradeType)
logger.Info("created UpgradePolicy for spec.versionUpgrade",
"cluster", tc.Name, "upgradePolicyName", upName, "targetVersion", tc.Spec.TalosVersion)
"cluster", tc.Name, "upgradePolicyName", upName, "upgradeType", upgradeType,
"talosVersion", tc.Spec.TalosVersion, "kubernetesVersion", tc.Spec.KubernetesVersion)
return true, ctrl.Result{RequeueAfter: operationalJobPollInterval}, nil
}

Expand Down Expand Up @@ -199,13 +224,12 @@ func (r *TalosClusterReconciler) reconcileVersionUpgrade(ctx context.Context, tc
platformv1alpha1.ConditionTypeVersionUpgradePending,
metav1.ConditionFalse,
platformv1alpha1.ReasonVersionUpgradeComplete,
fmt.Sprintf("UpgradePolicy %s completed. Cluster upgraded to Talos %s.", upName, tc.Spec.TalosVersion),
fmt.Sprintf("UpgradePolicy %s completed (%s).", upName, upgradeType),
tc.Generation,
)
r.Recorder.Eventf(tc, nil, "Normal", "VersionUpgradeComplete", "VersionUpgradeComplete",
"Cluster %s upgraded to Talos %s via UpgradePolicy %s",
tc.Name, tc.Spec.TalosVersion, upName)
"Cluster %s completed %s upgrade via UpgradePolicy %s", tc.Name, upgradeType, upName)
logger.Info("version upgrade complete via UpgradePolicy",
"cluster", tc.Name, "version", tc.Spec.TalosVersion)
"cluster", tc.Name, "upgradeType", upgradeType)
return true, ctrl.Result{}, nil
}
Loading
Loading