Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 54 additions & 32 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,27 +447,27 @@ func (db *DatabaseCollectionWithUser) GetCV(ctx context.Context, docid string, c
return db.documentRevisionForRequest(ctx, docid, revision, nil, cv, maxHistory, nil)
}

// GetDelta attempts to return the delta between fromRevId and toRevId. If the delta can't be generated,
// returns nil.
// GetDelta attempts to return the delta between fromRevId and toRevId. If the delta can't be generated, returns nil.
// Delta generation is synchronized per fromRev via a shared revision cache value lock to avoid multiple clients generating the same delta simultaneously.
func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromRev, toRev string) (delta *RevisionDelta, redactedRev *DocumentRevision, err error) {

if docID == "" || fromRev == "" || toRev == "" {
return nil, nil, nil
}
var fromRevision DocumentRevision

var initialFromRevision DocumentRevision
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The variable name initialFromRevision is unclear about its purpose in the double-check locking pattern. Consider renaming it to cachedFromRevision or unlocked_FromRevision to make it clearer that this is the revision fetched before acquiring the lock.

Copilot uses AI. Check for mistakes.
var fromRevVrs Version
fromRevIsCV := !base.IsRevTreeID(fromRev)
if fromRevIsCV {
fromRevVrs, err = ParseVersion(fromRev)
if err != nil {
return nil, nil, err
}
fromRevision, err = db.revisionCache.GetWithCV(ctx, docID, &fromRevVrs, RevCacheIncludeDelta)
initialFromRevision, err = db.revisionCache.GetWithCV(ctx, docID, &fromRevVrs, RevCacheIncludeDelta)
if err != nil {
return nil, nil, err
}
} else {
fromRevision, err = db.revisionCache.GetWithRev(ctx, docID, fromRev, RevCacheIncludeDelta)
initialFromRevision, err = db.revisionCache.GetWithRev(ctx, docID, fromRev, RevCacheIncludeDelta)
if err != nil {
return nil, nil, err
}
Expand All @@ -476,39 +476,60 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
// If the fromRevision is a removal cache entry (no body), but the user has access to that removal, then just
// return 404 missing to indicate that the body of the revision is no longer available.
// Delta can't be generated if we don't have the fromRevision body.
if fromRevision.Removed {
if initialFromRevision.Removed {
return nil, nil, ErrMissing
}

// If the fromRevision was a tombstone, then return error to tell delta sync to send full body replication
if fromRevision.Deleted {
// If the fromRevision was a tombstone, then return error to tell delta sync to send full body replication.
if initialFromRevision.Deleted {
return nil, nil, base.ErrDeltaSourceIsTombstone
}

// If both body and delta are not available for fromRevId, the delta can't be generated
if fromRevision.BodyBytes == nil && fromRevision.Delta == nil {
return nil, nil, err
// If delta is found, check whether it is a delta for the toRevID we want.
if initialFromRevision.Delta != nil && (initialFromRevision.Delta.ToCV == toRev || initialFromRevision.Delta.ToRevID == toRev) {
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRev, initialFromRevision.CV, initialFromRevision.Delta.ToChannels, initialFromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, initialFromRevision.Delta.RevisionHistory))
if !isAuthorized {
return nil, &redactedBody, nil
}
db.dbStats().DeltaSync().DeltaCacheHit.Add(1)
return initialFromRevision.Delta, nil, nil
}

// If delta is found, check whether it is a delta for the toRevID we want
if fromRevision.Delta != nil {
if fromRevision.Delta.ToCV == toRev || fromRevision.Delta.ToRevID == toRev {
if initialFromRevision.BodyBytes != nil {
// Acquire a delta lock to generate delta (ensuring only one toRev unmarshalling/diff for this fromRev and allow racing clients to share the result)
initialFromRevision.RevCacheValueDeltaLock.Lock()
defer initialFromRevision.RevCacheValueDeltaLock.Unlock()

isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRev, fromRevision.CV, fromRevision.Delta.ToChannels, fromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevision.Delta.RevisionHistory))
// fromRevisionForDiff is a version of the fromRevision that is guarded by the delta lock that we will use to generate the delta (or check again for a newly cached delta)
var fromRevisionForDiff DocumentRevision
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The variable name fromRevisionForDiff could be more descriptive. Consider renaming it to lockedFromRevision or fromRevisionAfterLock to indicate this is the revision checked after acquiring the lock in the double-check pattern.

Copilot uses AI. Check for mistakes.
if fromRevIsCV {
fromRevisionForDiff, err = db.revisionCache.GetWithCV(ctx, docID, &fromRevVrs, RevCacheIncludeDelta)
if err != nil {
return nil, nil, err
}
} else {
fromRevisionForDiff, err = db.revisionCache.GetWithRev(ctx, docID, fromRev, RevCacheIncludeDelta)
if err != nil {
return nil, nil, err
}
}

// Check if another writer beat us to generating the delta and caching it.
if fromRevisionForDiff.Delta != nil && (fromRevisionForDiff.Delta.ToCV == toRev || fromRevisionForDiff.Delta.ToRevID == toRev) {
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRev, fromRevisionForDiff.CV, fromRevisionForDiff.Delta.ToChannels, fromRevisionForDiff.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevisionForDiff.Delta.RevisionHistory))
if !isAuthorized {
return nil, &redactedBody, nil
}

// Case 2a. 'some rev' is the rev we're interested in - return the delta
// db.DbStats.StatsDeltaSync().Add(base.StatKeyDeltaCacheHits, 1)
db.dbStats().DeltaSync().DeltaCacheHit.Add(1)
return fromRevision.Delta, nil, nil
return fromRevisionForDiff.Delta, nil, nil
}
}

// Delta is unavailable, but the body is available.
if fromRevision.BodyBytes != nil {
// Delta can't be generated - returning nil forces a full body replication for toRevId.
if fromRevisionForDiff.BodyBytes == nil {
return nil, nil, nil
}

// Need to generate delta and cache it for others.
db.dbStats().DeltaSync().DeltaCacheMiss.Add(1)
var toRevision DocumentRevision
if !base.IsRevTreeID(toRev) {
Expand Down Expand Up @@ -537,7 +558,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
return nil, nil, ErrMissing
}

// If the revision we're generating a delta to is a tombstone, mark it as such and don't bother generating a delta
// If the revision we're generating a delta to is a tombstone, mark it as such and don't bother generating a delta.
if deleted {
revCacheDelta := newRevCacheDelta([]byte(base.EmptyDocument), fromRev, toRevision, deleted, nil)
if fromRevIsCV {
Expand All @@ -548,25 +569,25 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
return &revCacheDelta, nil, nil
}

// We didn't unmarshal fromBody earlier (in case we could get by with just the delta), so need do it now
// We didn't unmarshal fromBody earlier (in case we could get by with just the delta), so need do it now.
var fromBodyCopy Body
if err := fromBodyCopy.Unmarshal(fromRevision.BodyBytes); err != nil {
if err := fromBodyCopy.Unmarshal(fromRevisionForDiff.BodyBytes); err != nil {
return nil, nil, err
}

// We didn't unmarshal toBody earlier (in case we could get by with just the delta), so need do it now
// We didn't unmarshal toBody earlier (in case we could get by with just the delta), so need do it now.
var toBodyCopy Body
if err := toBodyCopy.Unmarshal(toRevision.BodyBytes); err != nil {
return nil, nil, err
}

// If attachments have changed between these revisions, we'll stamp the metadata into the bodies before diffing
// so that the resulting delta also contains attachment metadata changes
if fromRevision.Attachments != nil {
// so that the resulting delta also contains attachment metadata changes.
if fromRevisionForDiff.Attachments != nil {
// the delta library does not handle deltas in non builtin types,
// so we need the map[string]interface{} type conversion here
DeleteAttachmentVersion(fromRevision.Attachments)
fromBodyCopy[BodyAttachments] = map[string]interface{}(fromRevision.Attachments)
DeleteAttachmentVersion(fromRevisionForDiff.Attachments)
fromBodyCopy[BodyAttachments] = map[string]interface{}(fromRevisionForDiff.Attachments)
}

var toRevAttStorageMeta []AttachmentStorageMeta
Expand All @@ -583,7 +604,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
}
revCacheDelta := newRevCacheDelta(deltaBytes, fromRev, toRevision, deleted, toRevAttStorageMeta)

// Write the newly calculated delta back into the cache before returning
// Write the newly calculated delta back into the cache before returning.
if fromRevIsCV {
db.revisionCache.UpdateDeltaCV(ctx, docID, &fromRevVrs, revCacheDelta)
} else {
Expand All @@ -592,6 +613,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
return &revCacheDelta, nil, nil
}

// If both body and delta are not available for fromRevId, the delta can't be generated.
return nil, nil, nil
}

Expand Down
211 changes: 211 additions & 0 deletions db/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,217 @@ func TestDeltaSyncWhenFromRevIsLegacyRevTreeID(t *testing.T) {
assert.Equal(t, []byte(`{"bar":[],"quux":"fuzz"}`), delta.DeltaBytes)
}

// TestDeltaSyncConcurrentClientCachePopulation tests delta sync behavior when multiple goroutines request the same delta simultaneously.
// Ensures that only one delta is computed and others wait for the cached result.
// More instances of duplicated delta computation will be seen for larger documents since the delta computation takes longer.
func TestDeltaSyncConcurrentClientCachePopulation(t *testing.T) {
if !base.IsEnterpriseEdition() {
t.Skip("Delta sync only supported in EE")
}

tests := []struct {
name string
docSize int
concurrentClients int
}{
{
name: "100KBDoc_100Clients",
docSize: 100 * 1024,
concurrentClients: 100,
},
{
name: "100KBDoc_1000Clients",
docSize: 100 * 1024,
concurrentClients: 1000,
},
{
name: "100KBDoc_5000Clients",
docSize: 100 * 1024,
concurrentClients: 5000,
},
{
name: "5MBDoc_10Clients",
docSize: 5 * 1024 * 1024,
concurrentClients: 10,
},
{
name: "5MBDoc_100Clients",
docSize: 5 * 1024 * 1024,
concurrentClients: 100,
},
{
name: "5MBDoc_1000Clients",
docSize: 5 * 1024 * 1024,
concurrentClients: 1000,
},
{
name: "5MBDoc_5000Clients",
docSize: 5 * 1024 * 1024,
concurrentClients: 5000,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
db, ctx := SetupTestDBWithOptions(t, DatabaseContextOptions{DeltaSyncOptions: DeltaSyncOptions{Enabled: true, RevMaxAgeSeconds: 300}})
defer db.Close(ctx)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)

docID := "doc1_" + test.name
rev1, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "bar": "buzz", "quux": strings.Repeat("a", test.docSize)})
require.NoError(t, err)
rev2, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "quux": strings.Repeat("b", test.docSize), BodyRev: rev1})
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(test.concurrentClients)
for i := 0; i < test.concurrentClients; i++ {
go func() {
defer wg.Done()
delta, _, err := collection.GetDelta(ctx, docID, rev1, rev2)
require.NoErrorf(t, err, "Error getting delta for doc %q from rev %q to %q", docID, rev1, rev2)
require.NotNil(t, delta)
}()
}
wg.Wait()

// ensure only 1 delta miss and the remaining used cache
deltaCacheHits := db.DbStats.DeltaSync().DeltaCacheHit.Value()
deltaCacheMisses := db.DbStats.DeltaSync().DeltaCacheMiss.Value()
assert.Equal(t, int64(1), deltaCacheMisses, "Unexpected number of delta cache misses")
assert.Equal(t, int64(test.concurrentClients-1), deltaCacheHits, "Unexpected number of delta cache hits")
})
}
}

func BenchmarkDeltaSyncConcurrentClientCachePopulation(b *testing.B) {
if !base.IsEnterpriseEdition() {
b.Skip("Delta sync only supported in EE")
}

tests := []struct {
name string
docSize int
concurrentClients int
}{
{
name: "100KBDoc_1Client",
docSize: 100 * 1024,
concurrentClients: 1,
},
{
name: "100KBDoc_100Clients",
docSize: 100 * 1024,
concurrentClients: 100,
},
{
name: "100KBDoc_1000Clients",
docSize: 100 * 1024,
concurrentClients: 1000,
},
{
name: "100KBDoc_5000Clients",
docSize: 100 * 1024,
concurrentClients: 5000,
},
{
name: "5MBDoc_1Client",
docSize: 5 * 1024 * 1024,
concurrentClients: 1,
},
{
name: "5MBDoc_10Clients",
docSize: 5 * 1024 * 1024,
concurrentClients: 10,
},
{
name: "5MBDoc_100Clients",
docSize: 5 * 1024 * 1024,
concurrentClients: 100,
},
{
name: "5MBDoc_1000Clients",
docSize: 5 * 1024 * 1024,
concurrentClients: 1000,
},
{
name: "5MBDoc_5000Clients",
docSize: 5 * 1024 * 1024,
concurrentClients: 5000,
},
}

for _, test := range tests {
b.Run(test.name, func(b *testing.B) {
db, ctx := SetupTestDBWithOptions(b, DatabaseContextOptions{DeltaSyncOptions: DeltaSyncOptions{Enabled: true, RevMaxAgeSeconds: 300}})
defer db.Close(ctx)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, b, db)

docID := "doc1_" + test.name
rev1, _, _ := collection.Put(ctx, docID, Body{"foo": "bar", "bar": "buzz", "quux": strings.Repeat("a", test.docSize)})
rev2, _, _ := collection.Put(ctx, docID, Body{"foo": "bar", "quux": strings.Repeat("b", test.docSize), BodyRev: rev1})

for b.Loop() {
wg := sync.WaitGroup{}
wg.Add(test.concurrentClients)
for i := 0; i < test.concurrentClients; i++ {
go func() {
defer wg.Done()
_, _, _ = collection.GetDelta(ctx, docID, rev1, rev2)
}()
}
wg.Wait()
}
})
}
}

func BenchmarkDeltaSyncSingleClientCachePopulation(b *testing.B) {
if !base.IsEnterpriseEdition() {
b.Skip("Delta sync only supported in EE")
}

tests := []struct {
name string
docSize int
}{
{
name: "100KBDoc",
docSize: 100 * 1024,
},
//{
// name: "5MBDoc",
// docSize: 5 * 1024 * 1024,
//},
Comment on lines +1161 to +1164
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the commented-out test case for '5MBDoc'. If this configuration is intended for future use, consider documenting why it's commented out or remove it entirely to keep the code clean.

Suggested change
//{
// name: "5MBDoc",
// docSize: 5 * 1024 * 1024,
//},

Copilot uses AI. Check for mistakes.
}

for _, test := range tests {
b.Run(test.name, func(b *testing.B) {
db, ctx := SetupTestDBWithOptions(b, DatabaseContextOptions{DeltaSyncOptions: DeltaSyncOptions{Enabled: true, RevMaxAgeSeconds: 300}})
defer db.Close(ctx)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, b, db)

// run benchmark over 1000 non-cached deltas since repeated invocations of b.Loop would hit the cache
const numDocs = 1000
benchDocs := make([]struct{ docID, rev1, rev2 string }, 0, numDocs)
for i := range numDocs {
docID := fmt.Sprintf("%s_doc_%d", test.name, i)
rev1, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "bar": "buzz", "quux": strings.Repeat("a", test.docSize)})
require.NoError(b, err)
rev2, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "quux": strings.Repeat("b", test.docSize), BodyRev: rev1})
require.NoError(b, err)
benchDocs = append(benchDocs, struct{ docID, rev1, rev2 string }{docID: docID, rev1: rev1, rev2: rev2})
}

for b.Loop() {
for _, doc := range benchDocs {
_, _, _ = collection.GetDelta(ctx, doc.docID, doc.rev1, doc.rev2)
}
}
})
}
}

// Test delta sync behavior when the fromRevision is a channel removal.
func TestDeltaSyncWhenFromRevIsChannelRemoval(t *testing.T) {
if !base.IsEnterpriseEdition() {
Expand Down
Loading