Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions hack/examples/cluster-replica-streaming.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
apiVersion: postgresql.cnpg.io/v1
kind: Cluster
metadata:
name: cluster-replica
spec:
instances: 3
bootstrap:
pg_basebackup:
source: source
replica:
enabled: true
source: source
externalClusters:
- name: source
connectionParameters:
host: cluster-example-rw.default.svc
user: streaming_replica
sslmode: verify-full
dbname: postgres
sslKey:
name: cluster-example-replication
key: tls.key
sslCert:
name: cluster-example-replication
key: tls.crt
sslRootCert:
name: cluster-example-ca
key: ca.crt
plugin:
name: barman-cloud.cloudnative-pg.io
parameters:
barmanObjectName: minio-store
serverName: cluster-example
storage:
size: 1Gi

4 changes: 3 additions & 1 deletion internal/cnpgi/operator/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,9 @@ func getReplicaSourcePlugin(cluster *cnpgv1.Cluster) *cnpgv1.PluginConfiguration
func (config *PluginConfiguration) Validate() error {
err := NewConfigurationError()

if len(config.BarmanObjectName) == 0 && len(config.RecoveryBarmanObjectName) == 0 {
if len(config.BarmanObjectName) == 0 &&
len(config.RecoveryBarmanObjectName) == 0 &&
len(config.ReplicaSourceBarmanObjectName) == 0 {
return err.WithMessage("no reference to barmanObjectName have been included")
}

Expand Down
126 changes: 126 additions & 0 deletions internal/cnpgi/operator/config/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
Copyright © contributors to CloudNativePG, established as
CloudNativePG a Series of LF Projects, LLC.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

SPDX-License-Identifier: Apache-2.0
*/

package config

import (
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata"
)

var _ = Describe("PluginConfiguration.Validate", func() {
It("fails when no barman object name is set", func() {
cfg := &PluginConfiguration{}
Expect(cfg.Validate()).To(HaveOccurred())
})

It("passes when only BarmanObjectName is set (backup/archive)", func() {
cfg := &PluginConfiguration{BarmanObjectName: "my-store"}
Expect(cfg.Validate()).To(Succeed())
})

It("passes when only RecoveryBarmanObjectName is set (recovery bootstrap)", func() {
cfg := &PluginConfiguration{RecoveryBarmanObjectName: "my-store"}
Expect(cfg.Validate()).To(Succeed())
})

It("passes when only ReplicaSourceBarmanObjectName is set (pg_basebackup replica cluster)", func() {
cfg := &PluginConfiguration{ReplicaSourceBarmanObjectName: "my-store"}
Expect(cfg.Validate()).To(Succeed())
})
})

var _ = Describe("NewFromCluster", func() {
enabled := true

It("derives the replica source object store for a pg_basebackup replica cluster", func() {
cluster := &cnpgv1.Cluster{
ObjectMeta: metav1.ObjectMeta{Name: "cluster-replica", Namespace: "test-ns"},
Spec: cnpgv1.ClusterSpec{
Bootstrap: &cnpgv1.BootstrapConfiguration{
PgBaseBackup: &cnpgv1.BootstrapPgBaseBackup{Source: "source"},
},
ReplicaCluster: &cnpgv1.ReplicaClusterConfiguration{
Enabled: &enabled,
Source: "source",
},
ExternalClusters: []cnpgv1.ExternalCluster{
{
Name: "source",
PluginConfiguration: &cnpgv1.PluginConfiguration{
Name: metadata.PluginName,
Parameters: map[string]string{
"barmanObjectName": "minio-store",
"serverName": "cluster-example",
},
},
},
},
},
}

cfg := NewFromCluster(cluster)

// The replica source object store is derived from the external cluster plugin,
// while the backup/archive and recovery object stores remain empty: this is the
// distinguishing trait of a pg_basebackup replica cluster (a recovery-bootstrapped
// replica would also populate RecoveryBarmanObjectName).
Expect(cfg.ReplicaSourceBarmanObjectName).To(Equal("minio-store"))
Expect(cfg.ReplicaSourceServerName).To(Equal("cluster-example"))
Expect(cfg.BarmanObjectName).To(BeEmpty())
Expect(cfg.RecoveryBarmanObjectName).To(BeEmpty())

// Validate must accept it, otherwise the lifecycle hook skips sidecar injection.
Expect(cfg.Validate()).To(Succeed())
})

It("ignores a replica source backed by a different plugin", func() {
cluster := &cnpgv1.Cluster{
ObjectMeta: metav1.ObjectMeta{Name: "cluster-replica", Namespace: "test-ns"},
Spec: cnpgv1.ClusterSpec{
Bootstrap: &cnpgv1.BootstrapConfiguration{
PgBaseBackup: &cnpgv1.BootstrapPgBaseBackup{Source: "source"},
},
ReplicaCluster: &cnpgv1.ReplicaClusterConfiguration{
Enabled: &enabled,
Source: "source",
},
ExternalClusters: []cnpgv1.ExternalCluster{
{
Name: "source",
PluginConfiguration: &cnpgv1.PluginConfiguration{
Name: "some-other-plugin.cloudnative-pg.io",
Parameters: map[string]string{"barmanObjectName": "minio-store"},
},
},
},
},
}

cfg := NewFromCluster(cluster)

Expect(cfg.ReplicaSourceBarmanObjectName).To(BeEmpty())
Expect(cfg.Validate()).NotTo(Succeed())
})
})
32 changes: 32 additions & 0 deletions internal/cnpgi/operator/config/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
Copyright © contributors to CloudNativePG, established as
CloudNativePG a Series of LF Projects, LLC.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

SPDX-License-Identifier: Apache-2.0
*/

package config

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestConfig(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Config Suite")
}
20 changes: 18 additions & 2 deletions internal/cnpgi/operator/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,10 @@ func (impl LifecycleImplementation) collectAdditionalInstanceArgs(
return args
}

// Prefer the cluster object store (backup/archive). If not set, fallback to the recovery object store.
// If neither is configured, no additional args are provided.
// Only one object store provides the sidecar arguments, with precedence:
// BarmanObjectName (backup/archive) > RecoveryBarmanObjectName (recovery
// bootstrap) > ReplicaSourceBarmanObjectName (pg_basebackup replica).
// If none is configured, no additional args are provided.
if len(pluginConfiguration.BarmanObjectName) > 0 {
var barmanObjectStore barmancloudv1.ObjectStore
if err := impl.Client.Get(ctx, pluginConfiguration.GetBarmanObjectKey(), &barmanObjectStore); err != nil {
Expand Down Expand Up @@ -303,6 +305,20 @@ func (impl LifecycleImplementation) collectAdditionalInstanceArgs(
return args, nil
}

if len(pluginConfiguration.ReplicaSourceBarmanObjectName) > 0 {
key := pluginConfiguration.GetReplicaSourceBarmanObjectKey()
var barmanObjectStore barmancloudv1.ObjectStore
if err := impl.Client.Get(ctx, key, &barmanObjectStore); err != nil {
return nil, fmt.Errorf("while getting replica source barman object store %s: %w", key.String(), err)
}
args := barmanObjectStore.Spec.InstanceSidecarConfiguration.AdditionalContainerArgs
args = append(
args,
collectTypedAdditionalArgs(&barmanObjectStore)...,
)
return args, nil
}

return nil, nil
}

Expand Down
22 changes: 19 additions & 3 deletions internal/cnpgi/operator/lifecycle_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ func (impl LifecycleImplementation) collectSidecarResourcesForPod(
ctx context.Context,
configuration *config.PluginConfiguration,
) (corev1.ResourceRequirements, error) {
// Only one object store provides the sidecar resources, with precedence:
// BarmanObjectName (backup/archive) > RecoveryBarmanObjectName (recovery
// bootstrap) > ReplicaSourceBarmanObjectName (pg_basebackup replica).
if len(configuration.BarmanObjectName) > 0 {
// On a replica cluster that also archives, the designated primary
// will use both the replica source object store and the object store
Expand All @@ -64,12 +67,25 @@ func (impl LifecycleImplementation) collectSidecarResourcesForPod(
}

if len(configuration.RecoveryBarmanObjectName) > 0 {
// On a replica cluster that doesn't archive, the designated primary
// uses only the replica source object store.
// On a cluster recovering from an object store (including log-shipping
// replica clusters, where the recovery and replica source object stores
// coincide), we use the recovery object store for configuring the
// resources of the sidecar container.
var barmanObjectStore barmancloudv1.ObjectStore
if err := impl.Client.Get(ctx, configuration.GetRecoveryBarmanObjectKey(), &barmanObjectStore); err != nil {
return corev1.ResourceRequirements{}, err
}

return barmanObjectStore.Spec.InstanceSidecarConfiguration.Resources, nil
}

if len(configuration.ReplicaSourceBarmanObjectName) > 0 {
// On a replica cluster bootstrapped via pg_basebackup, the designated
// primary uses only the replica source object store.
// In this case, we use the replica source object store for configuring
// the resources of the sidecar container.
var barmanObjectStore barmancloudv1.ObjectStore
if err := impl.Client.Get(ctx, configuration.GetRecoveryBarmanObjectKey(), &barmanObjectStore); err != nil {
if err := impl.Client.Get(ctx, configuration.GetReplicaSourceBarmanObjectKey(), &barmanObjectStore); err != nil {
return corev1.ResourceRequirements{}, err
}

Expand Down
Loading
Loading