diff --git a/hack/examples/cluster-replica-streaming.yaml b/hack/examples/cluster-replica-streaming.yaml new file mode 100644 index 00000000..1fba84cf --- /dev/null +++ b/hack/examples/cluster-replica-streaming.yaml @@ -0,0 +1,36 @@ +apiVersion: postgresql.cnpg.io/v1 +kind: Cluster +metadata: + name: cluster-replica +spec: + instances: 3 + bootstrap: + pg_basebackup: + source: source + replica: + enabled: true + source: source + externalClusters: + - name: source + connectionParameters: + host: cluster-example-rw.default.svc + user: streaming_replica + sslmode: verify-full + dbname: postgres + sslKey: + name: cluster-example-replication + key: tls.key + sslCert: + name: cluster-example-replication + key: tls.crt + sslRootCert: + name: cluster-example-ca + key: ca.crt + plugin: + name: barman-cloud.cloudnative-pg.io + parameters: + barmanObjectName: minio-store + serverName: cluster-example + storage: + size: 1Gi + diff --git a/internal/cnpgi/operator/config/config.go b/internal/cnpgi/operator/config/config.go index 61c70633..7f16ddaa 100644 --- a/internal/cnpgi/operator/config/config.go +++ b/internal/cnpgi/operator/config/config.go @@ -263,7 +263,9 @@ func getReplicaSourcePlugin(cluster *cnpgv1.Cluster) *cnpgv1.PluginConfiguration func (config *PluginConfiguration) Validate() error { err := NewConfigurationError() - if len(config.BarmanObjectName) == 0 && len(config.RecoveryBarmanObjectName) == 0 { + if len(config.BarmanObjectName) == 0 && + len(config.RecoveryBarmanObjectName) == 0 && + len(config.ReplicaSourceBarmanObjectName) == 0 { return err.WithMessage("no reference to barmanObjectName have been included") } diff --git a/internal/cnpgi/operator/config/config_test.go b/internal/cnpgi/operator/config/config_test.go new file mode 100644 index 00000000..3697875c --- /dev/null +++ b/internal/cnpgi/operator/config/config_test.go @@ -0,0 +1,126 @@ +/* +Copyright © contributors to CloudNativePG, established as +CloudNativePG a Series of LF Projects, LLC. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package config + +import ( + cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata" +) + +var _ = Describe("PluginConfiguration.Validate", func() { + It("fails when no barman object name is set", func() { + cfg := &PluginConfiguration{} + Expect(cfg.Validate()).To(HaveOccurred()) + }) + + It("passes when only BarmanObjectName is set (backup/archive)", func() { + cfg := &PluginConfiguration{BarmanObjectName: "my-store"} + Expect(cfg.Validate()).To(Succeed()) + }) + + It("passes when only RecoveryBarmanObjectName is set (recovery bootstrap)", func() { + cfg := &PluginConfiguration{RecoveryBarmanObjectName: "my-store"} + Expect(cfg.Validate()).To(Succeed()) + }) + + It("passes when only ReplicaSourceBarmanObjectName is set (pg_basebackup replica cluster)", func() { + cfg := &PluginConfiguration{ReplicaSourceBarmanObjectName: "my-store"} + Expect(cfg.Validate()).To(Succeed()) + }) +}) + +var _ = Describe("NewFromCluster", func() { + enabled := true + + It("derives the replica source object store for a pg_basebackup replica cluster", func() { + cluster := &cnpgv1.Cluster{ + ObjectMeta: metav1.ObjectMeta{Name: "cluster-replica", Namespace: "test-ns"}, + Spec: cnpgv1.ClusterSpec{ + Bootstrap: &cnpgv1.BootstrapConfiguration{ + PgBaseBackup: &cnpgv1.BootstrapPgBaseBackup{Source: "source"}, + }, + ReplicaCluster: &cnpgv1.ReplicaClusterConfiguration{ + Enabled: &enabled, + Source: "source", + }, + ExternalClusters: []cnpgv1.ExternalCluster{ + { + Name: "source", + PluginConfiguration: &cnpgv1.PluginConfiguration{ + Name: metadata.PluginName, + Parameters: map[string]string{ + "barmanObjectName": "minio-store", + "serverName": "cluster-example", + }, + }, + }, + }, + }, + } + + cfg := NewFromCluster(cluster) + + // The replica source object store is derived from the external cluster plugin, + // while the backup/archive and recovery object stores remain empty: this is the + // distinguishing trait of a pg_basebackup replica cluster (a recovery-bootstrapped + // replica would also populate RecoveryBarmanObjectName). + Expect(cfg.ReplicaSourceBarmanObjectName).To(Equal("minio-store")) + Expect(cfg.ReplicaSourceServerName).To(Equal("cluster-example")) + Expect(cfg.BarmanObjectName).To(BeEmpty()) + Expect(cfg.RecoveryBarmanObjectName).To(BeEmpty()) + + // Validate must accept it, otherwise the lifecycle hook skips sidecar injection. + Expect(cfg.Validate()).To(Succeed()) + }) + + It("ignores a replica source backed by a different plugin", func() { + cluster := &cnpgv1.Cluster{ + ObjectMeta: metav1.ObjectMeta{Name: "cluster-replica", Namespace: "test-ns"}, + Spec: cnpgv1.ClusterSpec{ + Bootstrap: &cnpgv1.BootstrapConfiguration{ + PgBaseBackup: &cnpgv1.BootstrapPgBaseBackup{Source: "source"}, + }, + ReplicaCluster: &cnpgv1.ReplicaClusterConfiguration{ + Enabled: &enabled, + Source: "source", + }, + ExternalClusters: []cnpgv1.ExternalCluster{ + { + Name: "source", + PluginConfiguration: &cnpgv1.PluginConfiguration{ + Name: "some-other-plugin.cloudnative-pg.io", + Parameters: map[string]string{"barmanObjectName": "minio-store"}, + }, + }, + }, + }, + } + + cfg := NewFromCluster(cluster) + + Expect(cfg.ReplicaSourceBarmanObjectName).To(BeEmpty()) + Expect(cfg.Validate()).NotTo(Succeed()) + }) +}) diff --git a/internal/cnpgi/operator/config/suite_test.go b/internal/cnpgi/operator/config/suite_test.go new file mode 100644 index 00000000..5448bb3e --- /dev/null +++ b/internal/cnpgi/operator/config/suite_test.go @@ -0,0 +1,32 @@ +/* +Copyright © contributors to CloudNativePG, established as +CloudNativePG a Series of LF Projects, LLC. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package config + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestConfig(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Config Suite") +} diff --git a/internal/cnpgi/operator/lifecycle.go b/internal/cnpgi/operator/lifecycle.go index b25af554..776b3014 100644 --- a/internal/cnpgi/operator/lifecycle.go +++ b/internal/cnpgi/operator/lifecycle.go @@ -273,8 +273,10 @@ func (impl LifecycleImplementation) collectAdditionalInstanceArgs( return args } - // Prefer the cluster object store (backup/archive). If not set, fallback to the recovery object store. - // If neither is configured, no additional args are provided. + // Only one object store provides the sidecar arguments, with precedence: + // BarmanObjectName (backup/archive) > RecoveryBarmanObjectName (recovery + // bootstrap) > ReplicaSourceBarmanObjectName (pg_basebackup replica). + // If none is configured, no additional args are provided. if len(pluginConfiguration.BarmanObjectName) > 0 { var barmanObjectStore barmancloudv1.ObjectStore if err := impl.Client.Get(ctx, pluginConfiguration.GetBarmanObjectKey(), &barmanObjectStore); err != nil { @@ -303,6 +305,20 @@ func (impl LifecycleImplementation) collectAdditionalInstanceArgs( return args, nil } + if len(pluginConfiguration.ReplicaSourceBarmanObjectName) > 0 { + key := pluginConfiguration.GetReplicaSourceBarmanObjectKey() + var barmanObjectStore barmancloudv1.ObjectStore + if err := impl.Client.Get(ctx, key, &barmanObjectStore); err != nil { + return nil, fmt.Errorf("while getting replica source barman object store %s: %w", key.String(), err) + } + args := barmanObjectStore.Spec.InstanceSidecarConfiguration.AdditionalContainerArgs + args = append( + args, + collectTypedAdditionalArgs(&barmanObjectStore)..., + ) + return args, nil + } + return nil, nil } diff --git a/internal/cnpgi/operator/lifecycle_resources.go b/internal/cnpgi/operator/lifecycle_resources.go index 12498ee7..02b7b436 100644 --- a/internal/cnpgi/operator/lifecycle_resources.go +++ b/internal/cnpgi/operator/lifecycle_resources.go @@ -48,6 +48,9 @@ func (impl LifecycleImplementation) collectSidecarResourcesForPod( ctx context.Context, configuration *config.PluginConfiguration, ) (corev1.ResourceRequirements, error) { + // Only one object store provides the sidecar resources, with precedence: + // BarmanObjectName (backup/archive) > RecoveryBarmanObjectName (recovery + // bootstrap) > ReplicaSourceBarmanObjectName (pg_basebackup replica). if len(configuration.BarmanObjectName) > 0 { // On a replica cluster that also archives, the designated primary // will use both the replica source object store and the object store @@ -64,12 +67,25 @@ func (impl LifecycleImplementation) collectSidecarResourcesForPod( } if len(configuration.RecoveryBarmanObjectName) > 0 { - // On a replica cluster that doesn't archive, the designated primary - // uses only the replica source object store. + // On a cluster recovering from an object store (including log-shipping + // replica clusters, where the recovery and replica source object stores + // coincide), we use the recovery object store for configuring the + // resources of the sidecar container. + var barmanObjectStore barmancloudv1.ObjectStore + if err := impl.Client.Get(ctx, configuration.GetRecoveryBarmanObjectKey(), &barmanObjectStore); err != nil { + return corev1.ResourceRequirements{}, err + } + + return barmanObjectStore.Spec.InstanceSidecarConfiguration.Resources, nil + } + + if len(configuration.ReplicaSourceBarmanObjectName) > 0 { + // On a replica cluster bootstrapped via pg_basebackup, the designated + // primary uses only the replica source object store. // In this case, we use the replica source object store for configuring // the resources of the sidecar container. var barmanObjectStore barmancloudv1.ObjectStore - if err := impl.Client.Get(ctx, configuration.GetRecoveryBarmanObjectKey(), &barmanObjectStore); err != nil { + if err := impl.Client.Get(ctx, configuration.GetReplicaSourceBarmanObjectKey(), &barmanObjectStore); err != nil { return corev1.ResourceRequirements{}, err } diff --git a/internal/cnpgi/operator/lifecycle_test.go b/internal/cnpgi/operator/lifecycle_test.go index d2755ed3..a4851beb 100644 --- a/internal/cnpgi/operator/lifecycle_test.go +++ b/internal/cnpgi/operator/lifecycle_test.go @@ -28,6 +28,7 @@ import ( barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -293,6 +294,24 @@ var _ = Describe("LifecycleImplementation", func() { Expect(args).To(Equal(recoveryArgs)) }) + It("falls back to replica source object store when primary and recovery not set", func(ctx SpecContext) { + ns := "test-ns" + cluster := &cnpgv1.Cluster{ObjectMeta: metav1.ObjectMeta{Name: "c", Namespace: ns}} + pc := &config.PluginConfiguration{ + Cluster: cluster, + ReplicaSourceBarmanObjectName: "replica-source-store", + } + replicaArgs := []string{"--replica-a", "--replica-b"} + cli := buildClientFunc( + makeStoreFunc(ns, pc.ReplicaSourceBarmanObjectName, replicaArgs), + ).Build() + + impl := LifecycleImplementation{Client: cli} + args, err := impl.collectAdditionalInstanceArgs(ctx, pc) + Expect(err).NotTo(HaveOccurred()) + Expect(args).To(Equal(replicaArgs)) + }) + It("returns nil when neither object name is configured", func(ctx SpecContext) { ns := "test-ns" cluster := &cnpgv1.Cluster{ObjectMeta: metav1.ObjectMeta{Name: "c", Namespace: ns}} @@ -386,6 +405,117 @@ var _ = Describe("LifecycleImplementation", func() { Expect(err).NotTo(HaveOccurred()) Expect(args).To(Equal([]string{"--log-level=info"})) }) + + It("includes --log-level from replica source object store when only replica source set", func(ctx SpecContext) { + ns := "test-ns" + cluster := &cnpgv1.Cluster{ObjectMeta: metav1.ObjectMeta{Name: "c", Namespace: ns}} + pc := &config.PluginConfiguration{ + Cluster: cluster, + ReplicaSourceBarmanObjectName: "replica-source-store", + } + store := &barmancloudv1.ObjectStore{ + TypeMeta: metav1.TypeMeta{Kind: "ObjectStore", APIVersion: barmancloudv1.GroupVersion.String()}, + ObjectMeta: metav1.ObjectMeta{Name: pc.ReplicaSourceBarmanObjectName, Namespace: ns}, + Spec: barmancloudv1.ObjectStoreSpec{ + InstanceSidecarConfiguration: barmancloudv1.InstanceSidecarConfiguration{ + LogLevel: "warning", + }, + }, + } + s := runtime.NewScheme() + barmancloudv1.AddKnownTypes(s) + cli := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(store).Build() + + impl := LifecycleImplementation{Client: cli} + args, err := impl.collectAdditionalInstanceArgs(ctx, pc) + Expect(err).NotTo(HaveOccurred()) + Expect(args).To(Equal([]string{"--log-level=warning"})) + }) + }) + + Describe("collectSidecarResourcesForPod", func() { + makeStoreWithResourcesFunc := func(ns, name string, res corev1.ResourceRequirements) *barmancloudv1.ObjectStore { + return &barmancloudv1.ObjectStore{ + TypeMeta: metav1.TypeMeta{Kind: "ObjectStore", APIVersion: barmancloudv1.GroupVersion.String()}, + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: ns}, + Spec: barmancloudv1.ObjectStoreSpec{ + InstanceSidecarConfiguration: barmancloudv1.InstanceSidecarConfiguration{ + Resources: res, + }, + }, + } + } + + It("uses the cluster object store resources when set", func(ctx SpecContext) { + ns := "test-ns" + cluster := &cnpgv1.Cluster{ObjectMeta: metav1.ObjectMeta{Name: "c", Namespace: ns}} + pc := &config.PluginConfiguration{Cluster: cluster, BarmanObjectName: "primary-store"} + res := corev1.ResourceRequirements{ + Requests: corev1.ResourceList{corev1.ResourceMemory: resource.MustParse("64Mi")}, + } + cli := buildClientFunc(makeStoreWithResourcesFunc(ns, pc.BarmanObjectName, res)).Build() + + impl := LifecycleImplementation{Client: cli} + got, err := impl.collectSidecarResourcesForPod(ctx, pc) + Expect(err).NotTo(HaveOccurred()) + gotMem := got.Requests[corev1.ResourceMemory] + Expect(gotMem.String()).To(Equal("64Mi")) + }) + + It("uses the recovery object store resources when only recovery is set", func(ctx SpecContext) { + ns := "test-ns" + cluster := &cnpgv1.Cluster{ObjectMeta: metav1.ObjectMeta{Name: "c", Namespace: ns}} + pc := &config.PluginConfiguration{Cluster: cluster, RecoveryBarmanObjectName: "recovery-store"} + res := corev1.ResourceRequirements{ + Requests: corev1.ResourceList{corev1.ResourceMemory: resource.MustParse("128Mi")}, + } + cli := buildClientFunc(makeStoreWithResourcesFunc(ns, pc.RecoveryBarmanObjectName, res)).Build() + + impl := LifecycleImplementation{Client: cli} + got, err := impl.collectSidecarResourcesForPod(ctx, pc) + Expect(err).NotTo(HaveOccurred()) + gotMem := got.Requests[corev1.ResourceMemory] + Expect(gotMem.String()).To(Equal("128Mi")) + }) + + It("uses the replica source object store resources when only replica source is set", func(ctx SpecContext) { + ns := "test-ns" + cluster := &cnpgv1.Cluster{ObjectMeta: metav1.ObjectMeta{Name: "c", Namespace: ns}} + pc := &config.PluginConfiguration{Cluster: cluster, ReplicaSourceBarmanObjectName: "replica-source-store"} + res := corev1.ResourceRequirements{ + Requests: corev1.ResourceList{corev1.ResourceMemory: resource.MustParse("256Mi")}, + } + cli := buildClientFunc(makeStoreWithResourcesFunc(ns, pc.ReplicaSourceBarmanObjectName, res)).Build() + + impl := LifecycleImplementation{Client: cli} + got, err := impl.collectSidecarResourcesForPod(ctx, pc) + Expect(err).NotTo(HaveOccurred()) + gotMem := got.Requests[corev1.ResourceMemory] + Expect(gotMem.String()).To(Equal("256Mi")) + }) + + It("returns empty resources when no object store is configured", func(ctx SpecContext) { + ns := "test-ns" + cluster := &cnpgv1.Cluster{ObjectMeta: metav1.ObjectMeta{Name: "c", Namespace: ns}} + pc := &config.PluginConfiguration{Cluster: cluster} + cli := buildClientFunc().Build() + + impl := LifecycleImplementation{Client: cli} + got, err := impl.collectSidecarResourcesForPod(ctx, pc) + Expect(err).NotTo(HaveOccurred()) + Expect(got).To(Equal(corev1.ResourceRequirements{})) + }) + + It("returns an error if the replica source object store cannot be retrieved", func(ctx SpecContext) { + ns := "test-ns" + cluster := &cnpgv1.Cluster{ObjectMeta: metav1.ObjectMeta{Name: "c", Namespace: ns}} + pc := &config.PluginConfiguration{Cluster: cluster, ReplicaSourceBarmanObjectName: "missing-store"} + cli := buildClientFunc().Build() + + impl := LifecycleImplementation{Client: cli} + _, err := impl.collectSidecarResourcesForPod(ctx, pc) + Expect(err).To(HaveOccurred()) + }) }) })