diff --git a/fleetspeak/src/server/db/store.go b/fleetspeak/src/server/db/store.go index 106dd1cc..76717ec7 100644 --- a/fleetspeak/src/server/db/store.go +++ b/fleetspeak/src/server/db/store.go @@ -32,6 +32,7 @@ import ( "errors" "fmt" "io" + "maps" "math" "time" @@ -46,9 +47,14 @@ import ( tspb "google.golang.org/protobuf/types/known/timestamppb" ) -// ErrBroadcastDisabled is returned on an attempted operation on a disabled -// broadcast or an associated allocation. -var ErrBroadcastDisabled = errors.New("broadcast is disabled and allocation expired") +var ( + // ErrBroadcastDisabled is returned on an attempted operation on a disabled + // broadcast or an associated allocation. + ErrBroadcastDisabled = errors.New("broadcast is disabled and allocation expired") + + // ErrNotSupported is returned when a feature is not supported by the datastore. + ErrNotSupported = errors.New("feature not supported") +) // A Store describes the full persistence mechanism required by the base // fleetspeak system. These operations must be thread safe. These must also be @@ -318,9 +324,29 @@ func ComputeBroadcastAllocationCleanup(allocationLimit, allocated uint64) (uint6 return allocated - allocationLimit, nil } +// LabelsEqual returns true if the two lists of labels are equivalent in the +// context of required labels of broadcasts. +func LabelsEqual(l1, l2 []*fspb.Label) bool { + if len(l1) != len(l2) { + return false + } + type label struct{ s, l string } + toSet := func(ls []*fspb.Label) map[label]struct{} { + m := make(map[label]struct{}) + for _, l := range ls { + m[label{l.ServiceName, l.Label}] = struct{}{} + } + return m + } + return maps.Equal(toSet(l1), toSet(l2)) +} + // BroadcastStore provides methods to store and retrieve information about broadcasts. type BroadcastStore interface { // CreateBroadcast stores a new broadcast message. + // If b.GroupName is non-empty, existing active broadcasts with the same + // group, source, destination, and required_labels will be disabled + // before inserting this new broadcast. CreateBroadcast(ctx context.Context, b *spb.Broadcast, limit uint64) error // SetBroadcastLimit adjusts the limit of an existing broadcast. diff --git a/fleetspeak/src/server/dbtesting/broadcaststore_suite.go b/fleetspeak/src/server/dbtesting/broadcaststore_suite.go index 4ce8026c..427bdb17 100644 --- a/fleetspeak/src/server/dbtesting/broadcaststore_suite.go +++ b/fleetspeak/src/server/dbtesting/broadcaststore_suite.go @@ -1,7 +1,7 @@ package dbtesting import ( - "context" + "bytes" "errors" "testing" "time" @@ -32,7 +32,7 @@ func broadcastStoreTest(t *testing.T, ds db.Store) { fin2 := sertesting.SetServerRetryTime(func(_ uint32) time.Time { return db.Now().Add(time.Minute) }) defer fin2() - ctx := context.Background() + ctx := t.Context() var bid []ids.BroadcastID @@ -254,10 +254,110 @@ func testDisableBroadcasts(t *testing.T, ds db.Store, bid ids.BroadcastID, clien } } +func isBroadcastActive(t *testing.T, ds db.Store, bID ids.BroadcastID) bool { + t.Helper() + bs, err := ds.ListActiveBroadcasts(t.Context()) + if err != nil { + t.Fatal(err) + } + for _, b := range bs { + if bytes.Equal(b.Broadcast.BroadcastId, bID.Bytes()) { + return true + } + } + return false +} + +func testBroadcastReplacement(t *testing.T, ds db.Store) { + ctx := t.Context() + + createBC := func(grp, src, dst string, labels ...*fspb.Label) *spb.Broadcast { + return &spb.Broadcast{ + GroupName: grp, + Source: &fspb.Address{ServiceName: src}, + Destination: &fspb.Address{ServiceName: dst}, + RequiredLabels: labels, + MessageType: "Empty", + } + } + + l1 := &fspb.Label{ServiceName: "client", Label: "label1"} + l2 := &fspb.Label{ServiceName: "client", Label: "label2"} + + for _, tc := range []struct { + desc string + b1 *spb.Broadcast + b2 *spb.Broadcast + wantReplace bool + }{ + { + desc: "IdenticalMatchingProperties", + b1: createBC("g1", "src1", "dst1", l1), + b2: createBC("g1", "src1", "dst1", l1), + wantReplace: true, + }, + { + desc: "DifferentSource", + b1: createBC("g2", "src1", "dst1", l1), + b2: createBC("g2", "src2", "dst1", l1), + wantReplace: false, + }, + { + desc: "DifferentLabels", + b1: createBC("g3", "src1", "dst1", l1), + b2: createBC("g3", "src1", "dst1", l2), + wantReplace: false, + }, + { + desc: "DifferentDestination", + b1: createBC("g4", "src1", "dst1", l1), + b2: createBC("g4", "src1", "dst2", l1), + wantReplace: false, + }, + { + desc: "DifferentGroup", + b1: createBC("g5", "src1", "dst1", l1), + b2: createBC("g6", "src1", "dst1", l1), + wantReplace: false, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + id1, _ := ids.RandomBroadcastID() + id2, _ := ids.RandomBroadcastID() + tc.b1.BroadcastId = id1.Bytes() + tc.b2.BroadcastId = id2.Bytes() + + if err := ds.CreateBroadcast(ctx, tc.b1, 10); err != nil { + if errors.Is(err, db.ErrNotSupported) { + t.Skip("Broadcast replacement feature not supported by this datastore") + } + t.Fatalf("Failed to create b1: %v", err) + } + if !isBroadcastActive(t, ds, id1) { + t.Fatalf("b1 should be active upon creation") + } + + if err := ds.CreateBroadcast(ctx, tc.b2, 10); err != nil { + t.Fatalf("Failed to create b2: %v", err) + } + + gotActive := isBroadcastActive(t, ds, id1) + wantActive := !tc.wantReplace + if gotActive != wantActive { + t.Errorf("b1 got active status %v, want %v", gotActive, wantActive) + } + if !isBroadcastActive(t, ds, id2) { + t.Errorf("b2 should always be active") + } + }) + } +} + func broadcastStoreTestSuite(t *testing.T, env DbTestEnv) { t.Run("BroadcastStoreTestSuite", func(t *testing.T) { runTestSuite(t, env, map[string]func(*testing.T, db.Store){ - "BroadcastStoreTest": broadcastStoreTest, + "BroadcastStoreTest": broadcastStoreTest, + "BroadcastReplacementTest": testBroadcastReplacement, }) }) } diff --git a/fleetspeak/src/server/mysql/broadcaststore.go b/fleetspeak/src/server/mysql/broadcaststore.go index 814b17e2..c4853b50 100644 --- a/fleetspeak/src/server/mysql/broadcaststore.go +++ b/fleetspeak/src/server/mysql/broadcaststore.go @@ -114,6 +114,12 @@ func (d *Datastore) CreateBroadcast(ctx context.Context, b *spb.Broadcast, limit } dbB.messageLimit = limit return d.runInTx(ctx, false, func(tx *sql.Tx) error { + if b.GetGroupName() != "" { + // Failing broadcast requests with a groupname set is preferable to silently failing + // to meet the expected behavior. + return fmt.Errorf("broadcast replacement feature is not supported by the mysql datastore: %w", db.ErrNotSupported) + } + if _, err := tx.ExecContext(ctx, "INSERT INTO broadcasts("+ "broadcast_id, "+ "source_service_name, "+ diff --git a/fleetspeak/src/server/proto/fleetspeak_server/broadcasts.pb.go b/fleetspeak/src/server/proto/fleetspeak_server/broadcasts.pb.go index 095b1e5e..fceec9d6 100644 --- a/fleetspeak/src/server/proto/fleetspeak_server/broadcasts.pb.go +++ b/fleetspeak/src/server/proto/fleetspeak_server/broadcasts.pb.go @@ -50,6 +50,11 @@ type Broadcast struct { ExpirationTime *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=expiration_time,json=expirationTime,proto3" json:"expiration_time,omitempty"` // The payload of the broadcast. Data *anypb.Any `protobuf:"bytes,6,opt,name=data,proto3" json:"data,omitempty"` + // An optional group name that the broadcast gets categorized under. + // If set, this broadcast will replace older broadcasts with the same group, + // source, destination, and required labels on creation. This effectively + // causes clients to only receive the latest broadcast in the same group. + GroupName string `protobuf:"bytes,8,opt,name=group_name,json=groupName,proto3" json:"group_name,omitempty"` } func (x *Broadcast) Reset() { @@ -131,6 +136,13 @@ func (x *Broadcast) GetData() *anypb.Any { return nil } +func (x *Broadcast) GetGroupName() string { + if x != nil { + return x.GroupName + } + return "" +} + var File_fleetspeak_src_server_proto_fleetspeak_server_broadcasts_proto protoreflect.FileDescriptor var file_fleetspeak_src_server_proto_fleetspeak_server_broadcasts_proto_rawDesc = []byte{ @@ -146,7 +158,7 @@ var file_fleetspeak_src_server_proto_fleetspeak_server_broadcasts_proto_rawDesc 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x61, 0x6e, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xe0, 0x02, 0x0a, 0x09, 0x42, 0x72, 0x6f, 0x61, 0x64, 0x63, 0x61, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xff, 0x02, 0x0a, 0x09, 0x42, 0x72, 0x6f, 0x61, 0x64, 0x63, 0x61, 0x73, 0x74, 0x12, 0x21, 0x0a, 0x0c, 0x62, 0x72, 0x6f, 0x61, 0x64, 0x63, 0x61, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x62, 0x72, 0x6f, 0x61, 0x64, 0x63, 0x61, 0x73, 0x74, 0x49, 0x64, 0x12, 0x2b, 0x0a, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, @@ -168,12 +180,14 @@ var file_fleetspeak_src_server_proto_fleetspeak_server_broadcasts_proto_rawDesc 0x78, 0x70, 0x69, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x28, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, - 0x79, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x42, 0x4c, 0x5a, 0x4a, 0x67, 0x69, 0x74, 0x68, 0x75, - 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x66, 0x6c, 0x65, - 0x65, 0x74, 0x73, 0x70, 0x65, 0x61, 0x6b, 0x2f, 0x66, 0x6c, 0x65, 0x65, 0x74, 0x73, 0x70, 0x65, - 0x61, 0x6b, 0x2f, 0x73, 0x72, 0x63, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x2f, 0x66, 0x6c, 0x65, 0x65, 0x74, 0x73, 0x70, 0x65, 0x61, 0x6b, 0x5f, 0x73, - 0x65, 0x72, 0x76, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x79, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x1d, 0x0a, 0x0a, 0x67, 0x72, 0x6f, 0x75, 0x70, + 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x67, 0x72, 0x6f, + 0x75, 0x70, 0x4e, 0x61, 0x6d, 0x65, 0x42, 0x4c, 0x5a, 0x4a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x66, 0x6c, 0x65, 0x65, + 0x74, 0x73, 0x70, 0x65, 0x61, 0x6b, 0x2f, 0x66, 0x6c, 0x65, 0x65, 0x74, 0x73, 0x70, 0x65, 0x61, + 0x6b, 0x2f, 0x73, 0x72, 0x63, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2f, 0x66, 0x6c, 0x65, 0x65, 0x74, 0x73, 0x70, 0x65, 0x61, 0x6b, 0x5f, 0x73, 0x65, + 0x72, 0x76, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/fleetspeak/src/server/proto/fleetspeak_server/broadcasts.proto b/fleetspeak/src/server/proto/fleetspeak_server/broadcasts.proto index 8c1e8479..680fbffa 100644 --- a/fleetspeak/src/server/proto/fleetspeak_server/broadcasts.proto +++ b/fleetspeak/src/server/proto/fleetspeak_server/broadcasts.proto @@ -37,4 +37,10 @@ message Broadcast { // The payload of the broadcast. google.protobuf.Any data = 6; + + // An optional group name that the broadcast gets categorized under. + // If set, this broadcast will replace older broadcasts with the same group, + // source, destination, and required labels on creation. This effectively + // causes clients to only receive the latest broadcast in the same group. + string group_name = 8; } diff --git a/fleetspeak/src/server/spanner/broadcaststore.go b/fleetspeak/src/server/spanner/broadcaststore.go index 350beddc..5d4ed63c 100644 --- a/fleetspeak/src/server/spanner/broadcaststore.go +++ b/fleetspeak/src/server/spanner/broadcaststore.go @@ -48,6 +48,11 @@ func (d *Datastore) CreateBroadcast(ctx context.Context, b *spb.Broadcast, limit MessageLimit: int64(limit), } _, err := d.dbClient.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error { + if b.GetGroupName() != "" { + // Failing broadcast requests with a groupname set is preferable to silently failing + // to meet the expected behavior. + return fmt.Errorf("broadcast replacement feature is not supported by the spanner datastore: %w", db.ErrNotSupported) + } return d.tryCreateBroadcast(txn, &br) }) return err diff --git a/fleetspeak/src/server/sqlite/broadcaststore.go b/fleetspeak/src/server/sqlite/broadcaststore.go index d34bff7f..7e02b9ba 100644 --- a/fleetspeak/src/server/sqlite/broadcaststore.go +++ b/fleetspeak/src/server/sqlite/broadcaststore.go @@ -114,6 +114,12 @@ func (d *Datastore) CreateBroadcast(ctx context.Context, b *spb.Broadcast, limit } dbB.messageLimit = limit return d.runInTx(func(tx *sql.Tx) error { + if b.GetGroupName() != "" { + // Failing broadcast requests with a groupname set is preferable to silently failing + // to meet the expected behavior. + return fmt.Errorf("broadcast replacement feature is not supported by the sqlite datastore: %w", db.ErrNotSupported) + } + if _, err := tx.ExecContext(ctx, "INSERT INTO broadcasts("+ "broadcast_id, "+ "source_service_name, "+ diff --git a/spanner-setup/fleetspeak.pb b/spanner-setup/fleetspeak.pb index 02872976..386f644b 100644 Binary files a/spanner-setup/fleetspeak.pb and b/spanner-setup/fleetspeak.pb differ