Add Gocb KV range scan via sg-bucket abstraction with Rosmar support#8084
Add Gocb KV range scan via sg-bucket abstraction with Rosmar support#8084
Conversation
There was a problem hiding this comment.
Pull request overview
This pull request adds KV (Key-Value) range scan functionality to Sync Gateway through the sg-bucket abstraction layer, enabling both Couchbase Server (via gocb) and Rosmar to perform efficient range scans over document keys.
Changes:
- Adds
Scan()method implementation forCollectionthat bridges sg-bucket'sRangeScanStoreinterface to gocb's range scan API - Adds
AsRangeScanStore()helper function following the existing pattern for feature-checking datastore capabilities - Implements
RangeScanStoreinterface inLeakyDataStoreto support testing with leaky bucket
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| base/collection_rangescan.go | New file implementing the Scan() method for Collection, converting sg-bucket scan types to gocb scan types, and providing an iterator wrapper for scan results |
| base/collection_rangescan_test.go | Comprehensive test coverage for range scan functionality including full range, partial range, IDs-only, prefix scans, empty ranges, and tombstone exclusion |
| base/leaky_datastore.go | Implements RangeScanStore.Scan() by delegating to underlying datastore, adds interface assertion for RangeScanStore |
| base/collection.go | Adds feature flag BucketStoreFeatureRangeScan with version check (7.6+) |
| base/bucket.go | Adds AsRangeScanStore() helper function to check if a datastore supports range scan operations |
| if err := item.Content(&body); err == nil { | ||
| result.Body = body | ||
| } |
There was a problem hiding this comment.
Errors from item.Content(&body) are silently ignored. Based on the codebase pattern (see base/collection_gocb.go:294-296, base/config_persistence.go:245-247), errors from Content() should be handled rather than ignored. If content cannot be decoded, the caller should be notified. Consider logging the error or including error information in the returned ScanResultItem structure so consumers can detect and handle decode failures.
There was a problem hiding this comment.
Does gocb.RangeScan ignore tombstones? Do we care about tombstones for usage? If so, we should probably align the behavior in rosmar.
I would write a test to see what happens if there is a literal null body and a tombstone and then make sure rosmar and gocb are aligned in behavior.
|
|
||
| // AsRangeScanStore returns a RangeScanStore if the underlying dataStore implements and supports range scan operations. | ||
| func AsRangeScanStore(ds DataStore) (sgbucket.RangeScanStore, bool) { | ||
| rss, ok := GetBaseDataStore(ds).(sgbucket.RangeScanStore) |
There was a problem hiding this comment.
The use of GetBaseDataStore(ds) appears unnecessary since LeakyDataStore implements the RangeScanStore interface (as shown in base/leaky_datastore.go:550-556 and the assertion at line 561). This is inconsistent with AsSubdocStore (line 471-473) which performs a direct type assertion. Since LeakyDataStore.Scan() properly delegates to the underlying datastore's Scan() method, using GetBaseDataStore bypasses this wrapper functionality. Consider changing line 478 to: rss, ok := ds.(sgbucket.RangeScanStore) to be consistent with the pattern and properly support wrapped datastores like LeakyDataStore.
| rss, ok := GetBaseDataStore(ds).(sgbucket.RangeScanStore) | |
| rss, ok := ds.(sgbucket.RangeScanStore) |
|
|
||
| type gocbScanResultIterator struct { | ||
| result *gocb.ScanResult | ||
| idsOnly bool |
There was a problem hiding this comment.
The idsOnly field in gocbScanResultIterator is stored but never used. The code uses item.IDOnly() directly from the gocb result item instead. Consider removing this unused field to keep the code clean and avoid confusion.
There was a problem hiding this comment.
Agree with this comment.
a11b407 to
5ee3a86
Compare
Wrap gocb.Collection.Scan() in the SG collection adapter, converting sgbucket types to gocb types with RawJSONTranscoder. Add AsRangeScanStore helper, LeakyDataStore passthrough, and IsSupported for CBS 7.5+. Includes dual-backend test covering full range, partial range, IDsOnly, sampling, empty range, prefix, and tombstone exclusion.
CBS range scan returns documents with binary flags (from SetRaw), so use RawBinaryTranscoder instead of the default JSONTranscoder. Sort results in tests since CBS scans across vbuckets without ordering.
ea06bac to
07ed2f8
Compare
| if err := item.Content(&body); err == nil { | ||
| result.Body = body | ||
| } |
There was a problem hiding this comment.
Does gocb.RangeScan ignore tombstones? Do we care about tombstones for usage? If so, we should probably align the behavior in rosmar.
I would write a test to see what happens if there is a literal null body and a tombstone and then make sure rosmar and gocb are aligned in behavior.
|
|
||
| type gocbScanResultIterator struct { | ||
| result *gocb.ScanResult | ||
| idsOnly bool |
There was a problem hiding this comment.
Agree with this comment.
|
|
||
| allDocIDs := []string{"doc_a", "doc_b", "doc_c", "doc_d", "doc_e"} | ||
|
|
||
| // CBS range scan may not immediately reflect recent writes (requires persistence). |
There was a problem hiding this comment.
Is this true? This seems perilous if so. Does this depend of the type of storage used by the backing bucket?
| if UnitTestUrlIsWalrus() { | ||
| t.Skip("Requires Couchbase Server") | ||
| } |
There was a problem hiding this comment.
| if UnitTestUrlIsWalrus() { | |
| t.Skip("Requires Couchbase Server") | |
| } | |
| TestsRequireMobileRBAC(t) |
| t.Skip("Requires Couchbase Server") | ||
| } | ||
|
|
||
| SetUpTestLogging(t, LevelDebug, KeyAll) |
There was a problem hiding this comment.
do not like this pattern for committing this code because gocb / rosmar logging is long
| rbacSpec := BucketSpec{ | ||
| Server: bucket.BucketSpec.Server, | ||
| BucketName: bucketName, | ||
| Auth: TestAuthenticator{ | ||
| Username: rbacUsername, | ||
| Password: rbacPassword, | ||
| BucketName: bucketName, | ||
| }, | ||
| UseXattrs: bucket.BucketSpec.UseXattrs, | ||
| TLSSkipVerify: bucket.BucketSpec.TLSSkipVerify, | ||
| BucketOpTimeout: bucket.BucketSpec.BucketOpTimeout, | ||
| } |
There was a problem hiding this comment.
I would call getTestBucketSpec. Without this, if your cluster requires a TLS cert it would fail.
| var rbacBucket *GocbV2Bucket | ||
| require.EventuallyWithT(t, func(c *assert.CollectT) { | ||
| var connectErr error | ||
| rbacBucket, connectErr = GetGoCBv2Bucket(ctx, rbacSpec) |
There was a problem hiding this comment.
I don't think this is necessary to do in a retry loop, but I'm curious if so.
| rbacDataStore = rbacBucket.DefaultDataStore() | ||
| } else { | ||
| var dsErr error | ||
| rbacDataStore, dsErr = rbacBucket.NamedDataStore(ScopeAndCollectionName{ | ||
| Scope: adminDataStore.ScopeName(), | ||
| Collection: adminDataStore.CollectionName(), | ||
| }) | ||
| require.NoError(t, dsErr, "Failed to open named collection %s.%s on RBAC bucket", | ||
| adminDataStore.ScopeName(), adminDataStore.CollectionName()) | ||
| } |
There was a problem hiding this comment.
ds := rbacBucket.GetSingleDataStore()
| rss, ok := AsRangeScanStore(rbacDataStore) | ||
| require.True(t, ok, "DataStore does not support range scan") | ||
| docs := map[string][]byte{ | ||
| "doc_a": []byte(`{"name":"alpha"}`), | ||
| "doc_b": []byte(`{"name":"bravo"}`), | ||
| "doc_c": []byte(`{"name":"charlie"}`), | ||
| "doc_d": []byte(`{"name":"delta"}`), | ||
| "doc_e": []byte(`{"name":"echo"}`), | ||
| } | ||
| for k, v := range docs { | ||
| require.NoError(t, adminDataStore.SetRaw(k, 0, nil, v)) | ||
| } | ||
|
|
||
| allDocIDs := []string{"doc_a", "doc_b", "doc_c", "doc_d", "doc_e"} | ||
|
|
||
| // Wait for all docs to be visible through the RBAC connection's range scan. | ||
| require.EventuallyWithT(t, func(c *assert.CollectT) { | ||
| ids := collectScanIDs(t, rss, sgbucket.NewRangeScanForPrefix("doc_"), sgbucket.ScanOptions{IDsOnly: true}) | ||
| assert.Equal(c, allDocIDs, ids) | ||
| }, 30*time.Second, 500*time.Millisecond) | ||
|
|
||
| t.Run("FullRange", func(t *testing.T) { | ||
| scan := sgbucket.NewRangeScanForPrefix("doc_") | ||
| iter, err := rss.Scan(scan, sgbucket.ScanOptions{}) | ||
| require.NoError(t, err) | ||
| defer func() { assert.NoError(t, iter.Close()) }() | ||
|
|
||
| var ids []string | ||
| for { | ||
| item := iter.Next() | ||
| if item == nil { | ||
| break | ||
| } | ||
| ids = append(ids, item.ID) | ||
| assert.NotNil(t, item.Body, "Expected body for key %s", item.ID) | ||
| assert.NotZero(t, item.Cas, "Expected non-zero CAS for key %s", item.ID) | ||
| assert.False(t, item.IDOnly) | ||
| } | ||
| sort.Strings(ids) | ||
| require.Equal(t, allDocIDs, ids) | ||
| }) | ||
|
|
||
| t.Run("PartialRange", func(t *testing.T) { | ||
| scan := sgbucket.RangeScan{ | ||
| From: &sgbucket.ScanTerm{Term: "doc_b"}, | ||
| To: &sgbucket.ScanTerm{Term: "doc_d", Exclusive: true}, | ||
| } | ||
| ids := collectScanIDs(t, rss, scan, sgbucket.ScanOptions{}) | ||
| require.Equal(t, []string{"doc_b", "doc_c"}, ids) | ||
| }) | ||
|
|
||
| t.Run("IDsOnly", func(t *testing.T) { | ||
| scan := sgbucket.NewRangeScanForPrefix("doc_") | ||
| iter, err := rss.Scan(scan, sgbucket.ScanOptions{IDsOnly: true}) | ||
| require.NoError(t, err) | ||
| defer func() { assert.NoError(t, iter.Close()) }() | ||
|
|
||
| var ids []string | ||
| for { | ||
| item := iter.Next() | ||
| if item == nil { | ||
| break | ||
| } | ||
| ids = append(ids, item.ID) | ||
| assert.True(t, item.IDOnly) | ||
| assert.Nil(t, item.Body, "Expected nil body for IDsOnly scan, key %s", item.ID) | ||
| } | ||
| sort.Strings(ids) | ||
| require.Equal(t, allDocIDs, ids) | ||
| }) |
There was a problem hiding this comment.
It feels worth parameterizing this whole test with the standard bucket connection above rather than duplicate.
| // Assert interface compliance: | ||
| var ( | ||
| _ sgbucket.DataStore = &LeakyDataStore{} | ||
| _ sgbucket.DataStore = &LeakyDataStore{} |
There was a problem hiding this comment.
Would it make it easier of sgbucket.DataStore just implemented RangeScanStore so we don't have to check for compliance everywhere?
Note: This was coded primarily using Opus 4.6 - with manual guidance/verification/review.
Adds a test that can do a KV range scan on both Couchbase via Gocb and Rosmar via an sg-bucket interface.
Dependencies
Integration Tests
^TestRangeScan$https://jenkins.sgwdev.com/job/SyncGatewayIntegration/300/