Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 65 additions & 30 deletions internal/controller/postgrescluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"context"
"errors"
"fmt"
"time"

cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
enterprisev4 "github.com/splunk/splunk-operator/api/v4"
corev1 "k8s.io/api/core/v1"
Expand All @@ -42,9 +40,20 @@ type PostgresClusterReconciler struct {
Scheme *runtime.Scheme
}

const (
retryDelaytimer = time.Second * 15
)
// This struct is used to compare the merged configuration from PostgresClusterClass and PostgresClusterSpec
// in a normalized way, and not to use CNPG-default values which are causing false positive diff state while reconciliation loop.
// It contains only the fields that are relevant for our reconciliation and that we want to compare when deciding whether to update the CNPG Cluster spec or not.
type normalizedCNPGClusterSpec struct {
ImageName string
Instances int
// Parameters we set, instead of complete spec from CNPG
CustomDefinedParameters map[string]string
PgHBA []string
DefaultDatabase string
Owner string
StorageSize string
Resources corev1.ResourceRequirements
}

// +kubebuilder:rbac:groups=enterprise.splunk.com,resources=postgresclusters,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=enterprise.splunk.com,resources=postgresclusters/status,verbs=get;update;patch
Expand Down Expand Up @@ -94,35 +103,31 @@ func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ
// 4. Build the desired CNPG Cluster spec based on the merged configuration.
desiredSpec := r.buildCNPGClusterSpec(mergedConfig)

// 5. If CNPG cluster doesn't exist, create it.
// 5. Fetch existing CNPG Cluster or create it if it doesn't exist yet.
existingCNPG := &cnpgv1.Cluster{}
getCNPGClusterErr := r.Get(ctx, types.NamespacedName{Name: postgresCluster.Name, Namespace: postgresCluster.Namespace}, existingCNPG)
if apierrors.IsNotFound(getCNPGClusterErr) {
logger.Info("CNPG Cluster not found, creating:", "name", postgresCluster.Name)
cnpgCluster = r.buildCNPGCluster(postgresCluster, mergedConfig)
if buildCNPGClusterErr := r.Create(ctx, cnpgCluster); buildCNPGClusterErr != nil {
logger.Error(buildCNPGClusterErr, "Failed to create CNPG Cluster")
r.setCondition(postgresCluster, metav1.ConditionFalse, "ClusterBuildFailed", buildCNPGClusterErr.Error())
return res, buildCNPGClusterErr
err = r.Get(ctx, types.NamespacedName{Name: postgresCluster.Name, Namespace: postgresCluster.Namespace}, existingCNPG)
switch {
case apierrors.IsNotFound(err):
logger.Info("CNPG Cluster not found, creating", "name", postgresCluster.Name)
newCluster := r.buildCNPGCluster(postgresCluster, mergedConfig)
if err = r.Create(ctx, newCluster); err != nil {
logger.Error(err, "Failed to create CNPG Cluster")
r.setCondition(postgresCluster, metav1.ConditionFalse, "ClusterBuildFailed", err.Error())
return res, err
}
r.setCondition(postgresCluster, metav1.ConditionTrue, "ClusterBuildSucceeded", fmt.Sprintf("CNPG cluster build Succeeded: %s", postgresCluster.Name))
logger.Info("CNPG Cluster created successfully, requeueing for status update", "name", postgresCluster.Name)
return ctrl.Result{RequeueAfter: retryDelaytimer}, nil
} else if getCNPGClusterErr != nil {
logger.Error(getCNPGClusterErr, "Failed to get CNPG Cluster")
r.setCondition(postgresCluster, metav1.ConditionFalse, "ClusterGetFailed", getCNPGClusterErr.Error())
return res, getCNPGClusterErr
} else {
cnpgCluster = existingCNPG
}

// 6. Synchronize the existing CNPG Cluster spec with the desired configuration.
if err := r.Get(ctx, types.NamespacedName{Name: cnpgCluster.Name, Namespace: cnpgCluster.Namespace}, cnpgCluster); err != nil {
logger.Error(err, "Failed to fetch CNPG Cluster for update check", "name", cnpgCluster.Name)
r.setCondition(postgresCluster, metav1.ConditionFalse, "ClusterGetFailed", fmt.Sprintf("Failed to fetch CNPG Cluster for update check: %v", err))
return ctrl.Result{RequeueAfter: retryDelay}, nil
case err != nil:
logger.Error(err, "Failed to get CNPG Cluster")
r.setCondition(postgresCluster, metav1.ConditionFalse, "ClusterGetFailed", err.Error())
return res, err
}
if !equality.Semantic.DeepEqual(cnpgCluster.Spec, desiredSpec) {
cnpgCluster = existingCNPG

// 6. If CNPG Cluster exists, compare the current spec with the desired spec and update if necessary.
currentNormalizedSpec := normalizeCNPGClusterSpec(cnpgCluster.Spec, mergedConfig.PostgreSQLConfig)
desiredNormalizedSpec := normalizeCNPGClusterSpec(desiredSpec, mergedConfig.PostgreSQLConfig)
if !equality.Semantic.DeepEqual(currentNormalizedSpec, desiredNormalizedSpec) {
originalCluster := cnpgCluster.DeepCopy()
cnpgCluster.Spec = desiredSpec
if patchCNPGClusterErr := r.Patch(ctx, cnpgCluster, client.MergeFrom(originalCluster)); patchCNPGClusterErr != nil {
Expand All @@ -134,6 +139,8 @@ func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ
r.setCondition(postgresCluster, metav1.ConditionFalse, "ClusterUpdateFailed", patchCNPGClusterErr.Error())
return res, patchCNPGClusterErr
}
logger.Info("CNPG Cluster patched successfully, requeueing for status update", "name", cnpgCluster.Name)
return ctrl.Result{RequeueAfter: retryDelay}, nil
}

// 7. Reconcile ManagedRoles from PostgresCluster to CNPG Cluster
Expand Down Expand Up @@ -188,7 +195,9 @@ func (r *PostgresClusterReconciler) getMergedConfig(clusterClass *enterprisev4.P
return resultConfig
}

// buildCNPGClusterSpec builds the desired CNPG ClusterSpec and returns an error if mandatory fields are missing.
// buildCNPGClusterSpec builds the desired CNPG ClusterSpec.
// IMPORTANT: any field added here must also be added to normalizedCNPGClusterSpec and normalizeCNPGClusterSpec,
// otherwise it will not be included in drift detection and changes will be silently ignored.
func (r *PostgresClusterReconciler) buildCNPGClusterSpec(mergedConfig *enterprisev4.PostgresClusterSpec) cnpgv1.ClusterSpec {

// 3. Build the Spec
Expand Down Expand Up @@ -347,6 +356,32 @@ func (r *PostgresClusterReconciler) reconcileManagedRoles(ctx context.Context, p
return nil
}

func normalizeCNPGClusterSpec(spec cnpgv1.ClusterSpec, customDefinedParameters map[string]string) normalizedCNPGClusterSpec {
normalizedConf := normalizedCNPGClusterSpec{
ImageName: spec.ImageName,
Instances: spec.Instances,
// Parameters intentionally excluded — CNPG injects defaults that we don't change
StorageSize: spec.StorageConfiguration.Size,
Resources: spec.Resources,
}

if len(customDefinedParameters) > 0 {
normalizedConf.CustomDefinedParameters = make(map[string]string)
for k := range customDefinedParameters {
normalizedConf.CustomDefinedParameters[k] = spec.PostgresConfiguration.Parameters[k]
}
}
if len(spec.PostgresConfiguration.PgHBA) > 0 {
normalizedConf.PgHBA = spec.PostgresConfiguration.PgHBA
}

if spec.Bootstrap != nil && spec.Bootstrap.InitDB != nil {
normalizedConf.DefaultDatabase = spec.Bootstrap.InitDB.Database
normalizedConf.Owner = spec.Bootstrap.InitDB.Owner
}
return normalizedConf
}

// SetupWithManager sets up the controller with the Manager.
func (r *PostgresClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down
Loading