-
Notifications
You must be signed in to change notification settings - Fork 140
[3.3.2] CBG-5035: Add revcache value lock for synchronized delta generation #7912
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -370,53 +370,72 @@ func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid s | |||||
| return revision, nil | ||||||
| } | ||||||
|
|
||||||
| // GetDelta attempts to return the delta between fromRevId and toRevId. If the delta can't be generated, | ||||||
| // returns nil. | ||||||
| // GetDelta attempts to return the delta between fromRevId and toRevId. If the delta can't be generated, returns nil. | ||||||
| // Delta generation is synchronized per fromRev via a shared revision cache value lock to avoid multiple clients generating the same delta simultaneously. | ||||||
| func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromRevID, toRevID string) (delta *RevisionDelta, redactedRev *DocumentRevision, err error) { | ||||||
|
|
||||||
| if docID == "" || fromRevID == "" || toRevID == "" { | ||||||
| return nil, nil, nil | ||||||
| } | ||||||
|
|
||||||
| fromRevision, err := db.revisionCache.Get(ctx, docID, fromRevID, RevCacheIncludeDelta) | ||||||
| initialFromRevision, err := db.revisionCache.Get(ctx, docID, fromRevID, RevCacheIncludeDelta) | ||||||
|
|
||||||
| // If the fromRevision is a removal cache entry (no body), but the user has access to that removal, then just | ||||||
| // return 404 missing to indicate that the body of the revision is no longer available. | ||||||
| // Delta can't be generated if we don't have the fromRevision body. | ||||||
| if fromRevision.Removed { | ||||||
| if initialFromRevision.Removed { | ||||||
| return nil, nil, ErrMissing | ||||||
| } | ||||||
|
|
||||||
| // If the fromRevision was a tombstone, then return error to tell delta sync to send full body replication | ||||||
| if fromRevision.Deleted { | ||||||
| // If the fromRevision was a tombstone, then return error to tell delta sync to send full body replication. | ||||||
| if initialFromRevision.Deleted { | ||||||
| return nil, nil, base.ErrDeltaSourceIsTombstone | ||||||
| } | ||||||
|
|
||||||
| // If both body and delta are not available for fromRevId, the delta can't be generated | ||||||
| if fromRevision.BodyBytes == nil && fromRevision.Delta == nil { | ||||||
| // Note: In 4.x this is achieved by a returned `err` value on the revisionCache Get call above | ||||||
| if initialFromRevision.BodyBytes == nil && initialFromRevision.Delta == nil { | ||||||
| return nil, nil, err | ||||||
| } | ||||||
|
|
||||||
| // If delta is found, check whether it is a delta for the toRevID we want | ||||||
| if fromRevision.Delta != nil { | ||||||
| if fromRevision.Delta.ToRevID == toRevID { | ||||||
| // If delta is found, check whether it is a delta for the toRevID we want. | ||||||
| if initialFromRevision.Delta != nil && initialFromRevision.Delta.ToRevID == toRevID { | ||||||
| isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, initialFromRevision.Delta.ToChannels, initialFromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, initialFromRevision.Delta.RevisionHistory)) | ||||||
| if !isAuthorized { | ||||||
| return nil, &redactedBody, nil | ||||||
| } | ||||||
| db.dbStats().DeltaSync().DeltaCacheHit.Add(1) | ||||||
| return initialFromRevision.Delta, nil, nil | ||||||
| } | ||||||
|
|
||||||
| if initialFromRevision.BodyBytes != nil { | ||||||
| // Acquire a delta lock to generate delta (ensuring only one toRev unmarshalling/diff for this fromRev and allow racing clients to share the result) | ||||||
|
||||||
| // Acquire a delta lock to generate delta (ensuring only one toRev unmarshalling/diff for this fromRev and allow racing clients to share the result) | |
| // Acquire a delta lock to generate delta (ensuring only one toRev unmarshaling/diff for this fromRev and allow racing clients to share the result) |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing 'to' in 'need do it now' should be 'need to do it now'.
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing 'to' in 'need do it now' should be 'need to do it now'.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -541,6 +541,217 @@ func TestGetRemovalMultiChannel(t *testing.T) { | |
| require.Equal(t, bodyExpected, body) | ||
| } | ||
|
|
||
| // TestDeltaSyncConcurrentClientCachePopulation tests delta sync behavior when multiple goroutines request the same delta simultaneously. | ||
| // Ensures that only one delta is computed and others wait for the cached result. | ||
| // More instances of duplicated delta computation will be seen for larger documents since the delta computation takes longer. | ||
| func TestDeltaSyncConcurrentClientCachePopulation(t *testing.T) { | ||
| if !base.IsEnterpriseEdition() { | ||
| t.Skip("Delta sync only supported in EE") | ||
| } | ||
|
|
||
| tests := []struct { | ||
| name string | ||
| docSize int | ||
| concurrentClients int | ||
| }{ | ||
| { | ||
| name: "100KBDoc_100Clients", | ||
| docSize: 100 * 1024, | ||
| concurrentClients: 100, | ||
| }, | ||
| { | ||
| name: "100KBDoc_1000Clients", | ||
| docSize: 100 * 1024, | ||
| concurrentClients: 1000, | ||
| }, | ||
| { | ||
| name: "100KBDoc_5000Clients", | ||
| docSize: 100 * 1024, | ||
| concurrentClients: 5000, | ||
| }, | ||
| { | ||
| name: "5MBDoc_10Clients", | ||
| docSize: 5 * 1024 * 1024, | ||
| concurrentClients: 10, | ||
| }, | ||
| { | ||
| name: "5MBDoc_100Clients", | ||
| docSize: 5 * 1024 * 1024, | ||
| concurrentClients: 100, | ||
| }, | ||
| { | ||
| name: "5MBDoc_1000Clients", | ||
| docSize: 5 * 1024 * 1024, | ||
| concurrentClients: 1000, | ||
| }, | ||
| { | ||
| name: "5MBDoc_5000Clients", | ||
| docSize: 5 * 1024 * 1024, | ||
| concurrentClients: 5000, | ||
| }, | ||
| } | ||
|
|
||
| for _, test := range tests { | ||
| t.Run(test.name, func(t *testing.T) { | ||
| db, ctx := SetupTestDBWithOptions(t, DatabaseContextOptions{DeltaSyncOptions: DeltaSyncOptions{Enabled: true, RevMaxAgeSeconds: 300}}) | ||
| defer db.Close(ctx) | ||
| collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) | ||
|
|
||
| docID := "doc1_" + test.name | ||
| rev1, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "bar": "buzz", "quux": strings.Repeat("a", test.docSize)}) | ||
| require.NoError(t, err) | ||
| rev2, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "quux": strings.Repeat("b", test.docSize), BodyRev: rev1}) | ||
| require.NoError(t, err) | ||
|
|
||
| wg := sync.WaitGroup{} | ||
| wg.Add(test.concurrentClients) | ||
| for i := 0; i < test.concurrentClients; i++ { | ||
| go func() { | ||
| defer wg.Done() | ||
| delta, _, err := collection.GetDelta(ctx, docID, rev1, rev2) | ||
| require.NoErrorf(t, err, "Error getting delta for doc %q from rev %q to %q", docID, rev1, rev2) | ||
| require.NotNil(t, delta) | ||
| }() | ||
| } | ||
| wg.Wait() | ||
|
|
||
| // ensure only 1 delta miss and the remaining used cache | ||
| deltaCacheHits := db.DbStats.DeltaSync().DeltaCacheHit.Value() | ||
| deltaCacheMisses := db.DbStats.DeltaSync().DeltaCacheMiss.Value() | ||
| assert.Equal(t, int64(1), deltaCacheMisses, "Unexpected number of delta cache misses") | ||
| assert.Equal(t, int64(test.concurrentClients-1), deltaCacheHits, "Unexpected number of delta cache hits") | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func BenchmarkDeltaSyncConcurrentClientCachePopulation(b *testing.B) { | ||
| if !base.IsEnterpriseEdition() { | ||
| b.Skip("Delta sync only supported in EE") | ||
| } | ||
|
|
||
| tests := []struct { | ||
| name string | ||
| docSize int | ||
| concurrentClients int | ||
| }{ | ||
| { | ||
| name: "100KBDoc_1Client", | ||
| docSize: 100 * 1024, | ||
| concurrentClients: 1, | ||
| }, | ||
| { | ||
| name: "100KBDoc_100Clients", | ||
| docSize: 100 * 1024, | ||
| concurrentClients: 100, | ||
| }, | ||
| { | ||
| name: "100KBDoc_1000Clients", | ||
| docSize: 100 * 1024, | ||
| concurrentClients: 1000, | ||
| }, | ||
| { | ||
| name: "100KBDoc_5000Clients", | ||
| docSize: 100 * 1024, | ||
| concurrentClients: 5000, | ||
| }, | ||
| { | ||
| name: "5MBDoc_1Client", | ||
| docSize: 5 * 1024 * 1024, | ||
| concurrentClients: 1, | ||
| }, | ||
| { | ||
| name: "5MBDoc_10Clients", | ||
| docSize: 5 * 1024 * 1024, | ||
| concurrentClients: 10, | ||
| }, | ||
| { | ||
| name: "5MBDoc_100Clients", | ||
| docSize: 5 * 1024 * 1024, | ||
| concurrentClients: 100, | ||
| }, | ||
| { | ||
| name: "5MBDoc_1000Clients", | ||
| docSize: 5 * 1024 * 1024, | ||
| concurrentClients: 1000, | ||
| }, | ||
| { | ||
| name: "5MBDoc_5000Clients", | ||
| docSize: 5 * 1024 * 1024, | ||
| concurrentClients: 5000, | ||
| }, | ||
| } | ||
|
|
||
| for _, test := range tests { | ||
| b.Run(test.name, func(b *testing.B) { | ||
| db, ctx := SetupTestDBWithOptions(b, DatabaseContextOptions{DeltaSyncOptions: DeltaSyncOptions{Enabled: true, RevMaxAgeSeconds: 300}}) | ||
| defer db.Close(ctx) | ||
| collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, b, db) | ||
|
|
||
| docID := "doc1_" + test.name | ||
| rev1, _, _ := collection.Put(ctx, docID, Body{"foo": "bar", "bar": "buzz", "quux": strings.Repeat("a", test.docSize)}) | ||
| rev2, _, _ := collection.Put(ctx, docID, Body{"foo": "bar", "quux": strings.Repeat("b", test.docSize), BodyRev: rev1}) | ||
|
|
||
| for b.Loop() { | ||
| wg := sync.WaitGroup{} | ||
| wg.Add(test.concurrentClients) | ||
| for i := 0; i < test.concurrentClients; i++ { | ||
| go func() { | ||
| defer wg.Done() | ||
| _, _, _ = collection.GetDelta(ctx, docID, rev1, rev2) | ||
| }() | ||
| } | ||
| wg.Wait() | ||
| } | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func BenchmarkDeltaSyncSingleClientCachePopulation(b *testing.B) { | ||
| if !base.IsEnterpriseEdition() { | ||
| b.Skip("Delta sync only supported in EE") | ||
| } | ||
|
|
||
| tests := []struct { | ||
| name string | ||
| docSize int | ||
| }{ | ||
| { | ||
| name: "100KBDoc", | ||
| docSize: 100 * 1024, | ||
| }, | ||
| //{ | ||
| // name: "5MBDoc", | ||
| // docSize: 5 * 1024 * 1024, | ||
| //}, | ||
|
Comment on lines
+722
to
+725
|
||
| } | ||
|
|
||
| for _, test := range tests { | ||
| b.Run(test.name, func(b *testing.B) { | ||
| db, ctx := SetupTestDBWithOptions(b, DatabaseContextOptions{DeltaSyncOptions: DeltaSyncOptions{Enabled: true, RevMaxAgeSeconds: 300}}) | ||
| defer db.Close(ctx) | ||
| collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, b, db) | ||
|
|
||
| // run benchmark over 1000 non-cached deltas since repeated invocations of b.Loop would hit the cache | ||
| const numDocs = 1000 | ||
| benchDocs := make([]struct{ docID, rev1, rev2 string }, 0, numDocs) | ||
| for i := range numDocs { | ||
| docID := fmt.Sprintf("%s_doc_%d", test.name, i) | ||
| rev1, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "bar": "buzz", "quux": strings.Repeat("a", test.docSize)}) | ||
| require.NoError(b, err) | ||
| rev2, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "quux": strings.Repeat("b", test.docSize), BodyRev: rev1}) | ||
| require.NoError(b, err) | ||
| benchDocs = append(benchDocs, struct{ docID, rev1, rev2 string }{docID: docID, rev1: rev1, rev2: rev2}) | ||
| } | ||
|
|
||
| for b.Loop() { | ||
| for _, doc := range benchDocs { | ||
| _, _, _ = collection.GetDelta(ctx, doc.docID, doc.rev1, doc.rev2) | ||
| } | ||
| } | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| // Test delta sync behavior when the fromRevision is a channel removal. | ||
| func TestDeltaSyncWhenFromRevIsChannelRemoval(t *testing.T) { | ||
| db, ctx := setupTestDB(t) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.