Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions fleetspeak/src/server/db/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"errors"
"fmt"
"io"
"maps"
"math"
"time"

Expand All @@ -46,9 +47,14 @@ import (
tspb "google.golang.org/protobuf/types/known/timestamppb"
)

// ErrBroadcastDisabled is returned on an attempted operation on a disabled
// broadcast or an associated allocation.
var ErrBroadcastDisabled = errors.New("broadcast is disabled and allocation expired")
var (
// ErrBroadcastDisabled is returned on an attempted operation on a disabled
// broadcast or an associated allocation.
ErrBroadcastDisabled = errors.New("broadcast is disabled and allocation expired")

// ErrNotSupported is returned when a feature is not supported by the datastore.
ErrNotSupported = errors.New("feature not supported")
)

// A Store describes the full persistence mechanism required by the base
// fleetspeak system. These operations must be thread safe. These must also be
Expand Down Expand Up @@ -318,9 +324,29 @@ func ComputeBroadcastAllocationCleanup(allocationLimit, allocated uint64) (uint6
return allocated - allocationLimit, nil
}

// LabelsEqual returns true if the two lists of labels are equivalent in the
// context of required labels of broadcasts.
func LabelsEqual(l1, l2 []*fspb.Label) bool {
if len(l1) != len(l2) {
return false
}
type label struct{ s, l string }
toSet := func(ls []*fspb.Label) map[label]struct{} {
m := make(map[label]struct{})
for _, l := range ls {
m[label{l.ServiceName, l.Label}] = struct{}{}
}
return m
}
return maps.Equal(toSet(l1), toSet(l2))
}

// BroadcastStore provides methods to store and retrieve information about broadcasts.
type BroadcastStore interface {
// CreateBroadcast stores a new broadcast message.
// If b.GroupName is non-empty, existing active broadcasts with the same
// group, source, destination, and required_labels will be disabled
// before inserting this new broadcast.
CreateBroadcast(ctx context.Context, b *spb.Broadcast, limit uint64) error

// SetBroadcastLimit adjusts the limit of an existing broadcast.
Expand Down
106 changes: 103 additions & 3 deletions fleetspeak/src/server/dbtesting/broadcaststore_suite.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package dbtesting

import (
"context"
"bytes"
"errors"
"testing"
"time"
Expand Down Expand Up @@ -32,7 +32,7 @@ func broadcastStoreTest(t *testing.T, ds db.Store) {
fin2 := sertesting.SetServerRetryTime(func(_ uint32) time.Time { return db.Now().Add(time.Minute) })
defer fin2()

ctx := context.Background()
ctx := t.Context()

var bid []ids.BroadcastID

Expand Down Expand Up @@ -254,10 +254,110 @@ func testDisableBroadcasts(t *testing.T, ds db.Store, bid ids.BroadcastID, clien
}
}

func isBroadcastActive(t *testing.T, ds db.Store, bID ids.BroadcastID) bool {
t.Helper()
bs, err := ds.ListActiveBroadcasts(t.Context())
if err != nil {
t.Fatal(err)
}
for _, b := range bs {
if bytes.Equal(b.Broadcast.BroadcastId, bID.Bytes()) {
return true
}
}
return false
}

func testBroadcastReplacement(t *testing.T, ds db.Store) {
ctx := t.Context()

createBC := func(grp, src, dst string, labels ...*fspb.Label) *spb.Broadcast {
return &spb.Broadcast{
GroupName: grp,
Source: &fspb.Address{ServiceName: src},
Destination: &fspb.Address{ServiceName: dst},
RequiredLabels: labels,
MessageType: "Empty",
}
}

l1 := &fspb.Label{ServiceName: "client", Label: "label1"}
l2 := &fspb.Label{ServiceName: "client", Label: "label2"}

for _, tc := range []struct {
desc string
b1 *spb.Broadcast
b2 *spb.Broadcast
wantReplace bool
}{
{
desc: "IdenticalMatchingProperties",
b1: createBC("g1", "src1", "dst1", l1),
b2: createBC("g1", "src1", "dst1", l1),
wantReplace: true,
},
{
desc: "DifferentSource",
b1: createBC("g2", "src1", "dst1", l1),
b2: createBC("g2", "src2", "dst1", l1),
wantReplace: false,
},
{
desc: "DifferentLabels",
b1: createBC("g3", "src1", "dst1", l1),
b2: createBC("g3", "src1", "dst1", l2),
wantReplace: false,
},
{
desc: "DifferentDestination",
b1: createBC("g4", "src1", "dst1", l1),
b2: createBC("g4", "src1", "dst2", l1),
wantReplace: false,
},
{
desc: "DifferentGroup",
b1: createBC("g5", "src1", "dst1", l1),
b2: createBC("g6", "src1", "dst1", l1),
wantReplace: false,
},
} {
t.Run(tc.desc, func(t *testing.T) {
id1, _ := ids.RandomBroadcastID()
id2, _ := ids.RandomBroadcastID()
tc.b1.BroadcastId = id1.Bytes()
tc.b2.BroadcastId = id2.Bytes()

if err := ds.CreateBroadcast(ctx, tc.b1, 10); err != nil {
if errors.Is(err, db.ErrNotSupported) {
t.Skip("Broadcast replacement feature not supported by this datastore")
}
t.Fatalf("Failed to create b1: %v", err)
}
if !isBroadcastActive(t, ds, id1) {
t.Fatalf("b1 should be active upon creation")
}

if err := ds.CreateBroadcast(ctx, tc.b2, 10); err != nil {
t.Fatalf("Failed to create b2: %v", err)
}

gotActive := isBroadcastActive(t, ds, id1)
wantActive := !tc.wantReplace
if gotActive != wantActive {
t.Errorf("b1 got active status %v, want %v", gotActive, wantActive)
}
if !isBroadcastActive(t, ds, id2) {
t.Errorf("b2 should always be active")
}
})
}
}

func broadcastStoreTestSuite(t *testing.T, env DbTestEnv) {
t.Run("BroadcastStoreTestSuite", func(t *testing.T) {
runTestSuite(t, env, map[string]func(*testing.T, db.Store){
"BroadcastStoreTest": broadcastStoreTest,
"BroadcastStoreTest": broadcastStoreTest,
"BroadcastReplacementTest": testBroadcastReplacement,
})
})
}
6 changes: 6 additions & 0 deletions fleetspeak/src/server/mysql/broadcaststore.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ func (d *Datastore) CreateBroadcast(ctx context.Context, b *spb.Broadcast, limit
}
dbB.messageLimit = limit
return d.runInTx(ctx, false, func(tx *sql.Tx) error {
if b.GetGroupName() != "" {
// Failing broadcast requests with a groupname set is preferable to silently failing
// to meet the expected behavior.
return fmt.Errorf("broadcast replacement feature is not supported by the mysql datastore: %w", db.ErrNotSupported)
}

if _, err := tx.ExecContext(ctx, "INSERT INTO broadcasts("+
"broadcast_id, "+
"source_service_name, "+
Expand Down
28 changes: 21 additions & 7 deletions fleetspeak/src/server/proto/fleetspeak_server/broadcasts.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,10 @@ message Broadcast {

// The payload of the broadcast.
google.protobuf.Any data = 6;

// An optional group name that the broadcast gets categorized under.
// If set, this broadcast will replace older broadcasts with the same group,
// source, destination, and required labels on creation. This effectively
// causes clients to only receive the latest broadcast in the same group.
string group_name = 8;
}
5 changes: 5 additions & 0 deletions fleetspeak/src/server/spanner/broadcaststore.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ func (d *Datastore) CreateBroadcast(ctx context.Context, b *spb.Broadcast, limit
MessageLimit: int64(limit),
}
_, err := d.dbClient.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
if b.GetGroupName() != "" {
// Failing broadcast requests with a groupname set is preferable to silently failing
// to meet the expected behavior.
return fmt.Errorf("broadcast replacement feature is not supported by the spanner datastore: %w", db.ErrNotSupported)
}
return d.tryCreateBroadcast(txn, &br)
})
return err
Expand Down
6 changes: 6 additions & 0 deletions fleetspeak/src/server/sqlite/broadcaststore.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ func (d *Datastore) CreateBroadcast(ctx context.Context, b *spb.Broadcast, limit
}
dbB.messageLimit = limit
return d.runInTx(func(tx *sql.Tx) error {
if b.GetGroupName() != "" {
// Failing broadcast requests with a groupname set is preferable to silently failing
// to meet the expected behavior.
return fmt.Errorf("broadcast replacement feature is not supported by the sqlite datastore: %w", db.ErrNotSupported)
}

if _, err := tx.ExecContext(ctx, "INSERT INTO broadcasts("+
"broadcast_id, "+
"source_service_name, "+
Expand Down
Binary file modified spanner-setup/fleetspeak.pb
Binary file not shown.
Loading