Skip to content
Merged
86 changes: 54 additions & 32 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,27 +447,27 @@ func (db *DatabaseCollectionWithUser) GetCV(ctx context.Context, docid string, c
return db.documentRevisionForRequest(ctx, docid, revision, nil, cv, maxHistory, nil)
}

// GetDelta attempts to return the delta between fromRevId and toRevId. If the delta can't be generated,
// returns nil.
// GetDelta attempts to return the delta between fromRevId and toRevId. If the delta can't be generated, returns nil.
// Delta generation is synchronized per fromRev via a shared revision cache value lock to avoid multiple clients generating the same delta simultaneously.
func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromRev, toRev string) (delta *RevisionDelta, redactedRev *DocumentRevision, err error) {

if docID == "" || fromRev == "" || toRev == "" {
return nil, nil, nil
}
var fromRevision DocumentRevision

var initialFromRevision DocumentRevision
var fromRevVrs Version
fromRevIsCV := !base.IsRevTreeID(fromRev)
if fromRevIsCV {
fromRevVrs, err = ParseVersion(fromRev)
if err != nil {
return nil, nil, err
}
fromRevision, err = db.revisionCache.GetWithCV(ctx, docID, &fromRevVrs, RevCacheIncludeDelta)
initialFromRevision, err = db.revisionCache.GetWithCV(ctx, docID, &fromRevVrs, RevCacheIncludeDelta)
if err != nil {
return nil, nil, err
}
} else {
fromRevision, err = db.revisionCache.GetWithRev(ctx, docID, fromRev, RevCacheIncludeDelta)
initialFromRevision, err = db.revisionCache.GetWithRev(ctx, docID, fromRev, RevCacheIncludeDelta)
if err != nil {
return nil, nil, err
}
Expand All @@ -476,39 +476,60 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
// If the fromRevision is a removal cache entry (no body), but the user has access to that removal, then just
// return 404 missing to indicate that the body of the revision is no longer available.
// Delta can't be generated if we don't have the fromRevision body.
if fromRevision.Removed {
if initialFromRevision.Removed {
return nil, nil, ErrMissing
}

// If the fromRevision was a tombstone, then return error to tell delta sync to send full body replication
if fromRevision.Deleted {
// If the fromRevision was a tombstone, then return error to tell delta sync to send full body replication.
if initialFromRevision.Deleted {
return nil, nil, base.ErrDeltaSourceIsTombstone
}

// If both body and delta are not available for fromRevId, the delta can't be generated
if fromRevision.BodyBytes == nil && fromRevision.Delta == nil {
return nil, nil, err
// If delta is found, check whether it is a delta for the toRevID we want.
if initialFromRevision.Delta != nil && (initialFromRevision.Delta.ToCV == toRev || initialFromRevision.Delta.ToRevID == toRev) {
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRev, initialFromRevision.CV, initialFromRevision.Delta.ToChannels, initialFromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, initialFromRevision.Delta.RevisionHistory))
if !isAuthorized {
return nil, &redactedBody, nil
}
db.dbStats().DeltaSync().DeltaCacheHit.Add(1)
return initialFromRevision.Delta, nil, nil
}

// If delta is found, check whether it is a delta for the toRevID we want
if fromRevision.Delta != nil {
if fromRevision.Delta.ToCV == toRev || fromRevision.Delta.ToRevID == toRev {
if initialFromRevision.BodyBytes != nil {
// Acquire a delta lock to generate delta (ensuring only one toRev unmarshalling/diff for this fromRev and allow racing clients to share the result)
initialFromRevision.RevCacheValueDeltaLock.Lock()
defer initialFromRevision.RevCacheValueDeltaLock.Unlock()

isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRev, fromRevision.CV, fromRevision.Delta.ToChannels, fromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevision.Delta.RevisionHistory))
// fromRevisionForDiff is a version of the fromRevision that is guarded by the delta lock that we will use to generate the delta (or check again for a newly cached delta)
var fromRevisionForDiff DocumentRevision
if fromRevIsCV {
fromRevisionForDiff, err = db.revisionCache.GetWithCV(ctx, docID, &fromRevVrs, RevCacheIncludeDelta)
if err != nil {
return nil, nil, err
}
} else {
fromRevisionForDiff, err = db.revisionCache.GetWithRev(ctx, docID, fromRev, RevCacheIncludeDelta)
if err != nil {
return nil, nil, err
}
}

// Check if another writer beat us to generating the delta and caching it.
if fromRevisionForDiff.Delta != nil && (fromRevisionForDiff.Delta.ToCV == toRev || fromRevisionForDiff.Delta.ToRevID == toRev) {
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRev, fromRevisionForDiff.CV, fromRevisionForDiff.Delta.ToChannels, fromRevisionForDiff.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevisionForDiff.Delta.RevisionHistory))
if !isAuthorized {
return nil, &redactedBody, nil
}

// Case 2a. 'some rev' is the rev we're interested in - return the delta
// db.DbStats.StatsDeltaSync().Add(base.StatKeyDeltaCacheHits, 1)
db.dbStats().DeltaSync().DeltaCacheHit.Add(1)
return fromRevision.Delta, nil, nil
return fromRevisionForDiff.Delta, nil, nil
}
}

// Delta is unavailable, but the body is available.
if fromRevision.BodyBytes != nil {
// Delta can't be generated - returning nil forces a full body replication for toRevId.
if fromRevisionForDiff.BodyBytes == nil {
return nil, nil, nil
}

// Need to generate delta and cache it for others.
db.dbStats().DeltaSync().DeltaCacheMiss.Add(1)
var toRevision DocumentRevision
if !base.IsRevTreeID(toRev) {
Expand Down Expand Up @@ -537,7 +558,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
return nil, nil, ErrMissing
}

// If the revision we're generating a delta to is a tombstone, mark it as such and don't bother generating a delta
// If the revision we're generating a delta to is a tombstone, mark it as such and don't bother generating a delta.
if deleted {
revCacheDelta := newRevCacheDelta([]byte(base.EmptyDocument), fromRev, toRevision, deleted, nil)
if fromRevIsCV {
Expand All @@ -548,25 +569,25 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
return &revCacheDelta, nil, nil
}

// We didn't unmarshal fromBody earlier (in case we could get by with just the delta), so need do it now
// We didn't unmarshal fromBody earlier (in case we could get by with just the delta), so need do it now.
var fromBodyCopy Body
if err := fromBodyCopy.Unmarshal(fromRevision.BodyBytes); err != nil {
if err := fromBodyCopy.Unmarshal(fromRevisionForDiff.BodyBytes); err != nil {
return nil, nil, err
}

// We didn't unmarshal toBody earlier (in case we could get by with just the delta), so need do it now
// We didn't unmarshal toBody earlier (in case we could get by with just the delta), so need do it now.
var toBodyCopy Body
if err := toBodyCopy.Unmarshal(toRevision.BodyBytes); err != nil {
return nil, nil, err
}

// If attachments have changed between these revisions, we'll stamp the metadata into the bodies before diffing
// so that the resulting delta also contains attachment metadata changes
if fromRevision.Attachments != nil {
// so that the resulting delta also contains attachment metadata changes.
if fromRevisionForDiff.Attachments != nil {
// the delta library does not handle deltas in non builtin types,
// so we need the map[string]interface{} type conversion here
DeleteAttachmentVersion(fromRevision.Attachments)
fromBodyCopy[BodyAttachments] = map[string]any(fromRevision.Attachments)
DeleteAttachmentVersion(fromRevisionForDiff.Attachments)
fromBodyCopy[BodyAttachments] = map[string]any(fromRevisionForDiff.Attachments)
}

var toRevAttStorageMeta []AttachmentStorageMeta
Expand All @@ -583,7 +604,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
}
revCacheDelta := newRevCacheDelta(deltaBytes, fromRev, toRevision, deleted, toRevAttStorageMeta)

// Write the newly calculated delta back into the cache before returning
// Write the newly calculated delta back into the cache before returning.
if fromRevIsCV {
db.revisionCache.UpdateDeltaCV(ctx, docID, &fromRevVrs, revCacheDelta)
} else {
Expand All @@ -592,6 +613,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
return &revCacheDelta, nil, nil
}

// If both body and delta are not available for fromRevId, the delta can't be generated.
return nil, nil, nil
}

Expand Down
211 changes: 211 additions & 0 deletions db/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,217 @@ func TestDeltaSyncWhenFromRevIsLegacyRevTreeID(t *testing.T) {
assert.Equal(t, []byte(`{"bar":[],"quux":"fuzz"}`), delta.DeltaBytes)
}

// TestDeltaSyncConcurrentClientCachePopulation tests delta sync behavior when multiple goroutines request the same delta simultaneously.
// Ensures that only one delta is computed and others wait for the cached result.
// More instances of duplicated delta computation will be seen for larger documents since the delta computation takes longer.
func TestDeltaSyncConcurrentClientCachePopulation(t *testing.T) {
if !base.IsEnterpriseEdition() {
t.Skip("Delta sync only supported in EE")
}

tests := []struct {
name string
docSize int
concurrentClients int
}{
{
name: "100KBDoc_100Clients",
docSize: 100 * 1024,
concurrentClients: 100,
},
{
name: "100KBDoc_1000Clients",
docSize: 100 * 1024,
concurrentClients: 1000,
},
{
name: "100KBDoc_5000Clients",
docSize: 100 * 1024,
concurrentClients: 5000,
},
{
name: "5MBDoc_10Clients",
docSize: 5 * 1024 * 1024,
concurrentClients: 10,
},
{
name: "5MBDoc_100Clients",
docSize: 5 * 1024 * 1024,
concurrentClients: 100,
},
{
name: "5MBDoc_1000Clients",
docSize: 5 * 1024 * 1024,
concurrentClients: 1000,
},
{
name: "5MBDoc_5000Clients",
docSize: 5 * 1024 * 1024,
concurrentClients: 5000,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
db, ctx := SetupTestDBWithOptions(t, DatabaseContextOptions{DeltaSyncOptions: DeltaSyncOptions{Enabled: true, RevMaxAgeSeconds: 300}})
defer db.Close(ctx)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)

docID := "doc1_" + test.name
rev1, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "bar": "buzz", "quux": strings.Repeat("a", test.docSize)})
require.NoError(t, err)
rev2, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "quux": strings.Repeat("b", test.docSize), BodyRev: rev1})
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(test.concurrentClients)
for i := 0; i < test.concurrentClients; i++ {
go func() {
defer wg.Done()
delta, _, err := collection.GetDelta(ctx, docID, rev1, rev2)
require.NoErrorf(t, err, "Error getting delta for doc %q from rev %q to %q", docID, rev1, rev2)
require.NotNil(t, delta)
}()
}
wg.Wait()

// ensure only 1 delta miss and the remaining used cache
deltaCacheHits := db.DbStats.DeltaSync().DeltaCacheHit.Value()
deltaCacheMisses := db.DbStats.DeltaSync().DeltaCacheMiss.Value()
assert.Equal(t, int64(1), deltaCacheMisses, "Unexpected number of delta cache misses")
assert.Equal(t, int64(test.concurrentClients-1), deltaCacheHits, "Unexpected number of delta cache hits")
})
}
}

func BenchmarkDeltaSyncConcurrentClientCachePopulation(b *testing.B) {
if !base.IsEnterpriseEdition() {
b.Skip("Delta sync only supported in EE")
}

tests := []struct {
name string
docSize int
concurrentClients int
}{
{
name: "100KBDoc_1Client",
docSize: 100 * 1024,
concurrentClients: 1,
},
{
name: "100KBDoc_100Clients",
docSize: 100 * 1024,
concurrentClients: 100,
},
{
name: "100KBDoc_1000Clients",
docSize: 100 * 1024,
concurrentClients: 1000,
},
{
name: "100KBDoc_5000Clients",
docSize: 100 * 1024,
concurrentClients: 5000,
},
{
name: "5MBDoc_1Client",
docSize: 5 * 1024 * 1024,
concurrentClients: 1,
},
{
name: "5MBDoc_10Clients",
docSize: 5 * 1024 * 1024,
concurrentClients: 10,
},
{
name: "5MBDoc_100Clients",
docSize: 5 * 1024 * 1024,
concurrentClients: 100,
},
{
name: "5MBDoc_1000Clients",
docSize: 5 * 1024 * 1024,
concurrentClients: 1000,
},
{
name: "5MBDoc_5000Clients",
docSize: 5 * 1024 * 1024,
concurrentClients: 5000,
},
}

for _, test := range tests {
b.Run(test.name, func(b *testing.B) {
db, ctx := SetupTestDBWithOptions(b, DatabaseContextOptions{DeltaSyncOptions: DeltaSyncOptions{Enabled: true, RevMaxAgeSeconds: 300}})
defer db.Close(ctx)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, b, db)

docID := "doc1_" + test.name
rev1, _, _ := collection.Put(ctx, docID, Body{"foo": "bar", "bar": "buzz", "quux": strings.Repeat("a", test.docSize)})
rev2, _, _ := collection.Put(ctx, docID, Body{"foo": "bar", "quux": strings.Repeat("b", test.docSize), BodyRev: rev1})

for b.Loop() {
wg := sync.WaitGroup{}
wg.Add(test.concurrentClients)
for i := 0; i < test.concurrentClients; i++ {
go func() {
defer wg.Done()
_, _, _ = collection.GetDelta(ctx, docID, rev1, rev2)
}()
}
wg.Wait()
}
})
}
}

func BenchmarkDeltaSyncSingleClientCachePopulation(b *testing.B) {
if !base.IsEnterpriseEdition() {
b.Skip("Delta sync only supported in EE")
}

tests := []struct {
name string
docSize int
}{
{
name: "100KBDoc",
docSize: 100 * 1024,
},
//{
// name: "5MBDoc",
// docSize: 5 * 1024 * 1024,
//},
}

for _, test := range tests {
b.Run(test.name, func(b *testing.B) {
db, ctx := SetupTestDBWithOptions(b, DatabaseContextOptions{DeltaSyncOptions: DeltaSyncOptions{Enabled: true, RevMaxAgeSeconds: 300}})
defer db.Close(ctx)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, b, db)

// run benchmark over 1000 non-cached deltas since repeated invocations of b.Loop would hit the cache
const numDocs = 1000
benchDocs := make([]struct{ docID, rev1, rev2 string }, 0, numDocs)
for i := range numDocs {
docID := fmt.Sprintf("%s_doc_%d", test.name, i)
rev1, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "bar": "buzz", "quux": strings.Repeat("a", test.docSize)})
require.NoError(b, err)
rev2, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "quux": strings.Repeat("b", test.docSize), BodyRev: rev1})
require.NoError(b, err)
benchDocs = append(benchDocs, struct{ docID, rev1, rev2 string }{docID: docID, rev1: rev1, rev2: rev2})
}

for b.Loop() {
for _, doc := range benchDocs {
_, _, _ = collection.GetDelta(ctx, doc.docID, doc.rev1, doc.rev2)
}
}
})
}
}

// Test delta sync behavior when the fromRevision is a channel removal.
func TestDeltaSyncWhenFromRevIsChannelRemoval(t *testing.T) {
if !base.IsEnterpriseEdition() {
Expand Down
Loading