diff --git a/db/crud.go b/db/crud.go index 2084c42a59..e57d502216 100644 --- a/db/crud.go +++ b/db/crud.go @@ -447,14 +447,14 @@ func (db *DatabaseCollectionWithUser) GetCV(ctx context.Context, docid string, c return db.documentRevisionForRequest(ctx, docid, revision, nil, cv, maxHistory, nil) } -// GetDelta attempts to return the delta between fromRevId and toRevId. If the delta can't be generated, -// returns nil. +// GetDelta attempts to return the delta between fromRevId and toRevId. If the delta can't be generated, returns nil. +// Delta generation is synchronized per fromRev via a shared revision cache value lock to avoid multiple clients generating the same delta simultaneously. func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromRev, toRev string) (delta *RevisionDelta, redactedRev *DocumentRevision, err error) { - if docID == "" || fromRev == "" || toRev == "" { return nil, nil, nil } - var fromRevision DocumentRevision + + var initialFromRevision DocumentRevision var fromRevVrs Version fromRevIsCV := !base.IsRevTreeID(fromRev) if fromRevIsCV { @@ -462,12 +462,12 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR if err != nil { return nil, nil, err } - fromRevision, err = db.revisionCache.GetWithCV(ctx, docID, &fromRevVrs, RevCacheIncludeDelta) + initialFromRevision, err = db.revisionCache.GetWithCV(ctx, docID, &fromRevVrs, RevCacheIncludeDelta) if err != nil { return nil, nil, err } } else { - fromRevision, err = db.revisionCache.GetWithRev(ctx, docID, fromRev, RevCacheIncludeDelta) + initialFromRevision, err = db.revisionCache.GetWithRev(ctx, docID, fromRev, RevCacheIncludeDelta) if err != nil { return nil, nil, err } @@ -476,39 +476,60 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR // If the fromRevision is a removal cache entry (no body), but the user has access to that removal, then just // return 404 missing to indicate that the body of the revision is no longer available. // Delta can't be generated if we don't have the fromRevision body. - if fromRevision.Removed { + if initialFromRevision.Removed { return nil, nil, ErrMissing } - // If the fromRevision was a tombstone, then return error to tell delta sync to send full body replication - if fromRevision.Deleted { + // If the fromRevision was a tombstone, then return error to tell delta sync to send full body replication. + if initialFromRevision.Deleted { return nil, nil, base.ErrDeltaSourceIsTombstone } - // If both body and delta are not available for fromRevId, the delta can't be generated - if fromRevision.BodyBytes == nil && fromRevision.Delta == nil { - return nil, nil, err + // If delta is found, check whether it is a delta for the toRevID we want. + if initialFromRevision.Delta != nil && (initialFromRevision.Delta.ToCV == toRev || initialFromRevision.Delta.ToRevID == toRev) { + isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRev, initialFromRevision.CV, initialFromRevision.Delta.ToChannels, initialFromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, initialFromRevision.Delta.RevisionHistory)) + if !isAuthorized { + return nil, &redactedBody, nil + } + db.dbStats().DeltaSync().DeltaCacheHit.Add(1) + return initialFromRevision.Delta, nil, nil } - // If delta is found, check whether it is a delta for the toRevID we want - if fromRevision.Delta != nil { - if fromRevision.Delta.ToCV == toRev || fromRevision.Delta.ToRevID == toRev { + if initialFromRevision.BodyBytes != nil { + // Acquire a delta lock to generate delta (ensuring only one toRev unmarshalling/diff for this fromRev and allow racing clients to share the result) + initialFromRevision.RevCacheValueDeltaLock.Lock() + defer initialFromRevision.RevCacheValueDeltaLock.Unlock() - isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRev, fromRevision.CV, fromRevision.Delta.ToChannels, fromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevision.Delta.RevisionHistory)) + // fromRevisionForDiff is a version of the fromRevision that is guarded by the delta lock that we will use to generate the delta (or check again for a newly cached delta) + var fromRevisionForDiff DocumentRevision + if fromRevIsCV { + fromRevisionForDiff, err = db.revisionCache.GetWithCV(ctx, docID, &fromRevVrs, RevCacheIncludeDelta) + if err != nil { + return nil, nil, err + } + } else { + fromRevisionForDiff, err = db.revisionCache.GetWithRev(ctx, docID, fromRev, RevCacheIncludeDelta) + if err != nil { + return nil, nil, err + } + } + + // Check if another writer beat us to generating the delta and caching it. + if fromRevisionForDiff.Delta != nil && (fromRevisionForDiff.Delta.ToCV == toRev || fromRevisionForDiff.Delta.ToRevID == toRev) { + isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRev, fromRevisionForDiff.CV, fromRevisionForDiff.Delta.ToChannels, fromRevisionForDiff.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevisionForDiff.Delta.RevisionHistory)) if !isAuthorized { return nil, &redactedBody, nil } - - // Case 2a. 'some rev' is the rev we're interested in - return the delta - // db.DbStats.StatsDeltaSync().Add(base.StatKeyDeltaCacheHits, 1) db.dbStats().DeltaSync().DeltaCacheHit.Add(1) - return fromRevision.Delta, nil, nil + return fromRevisionForDiff.Delta, nil, nil } - } - // Delta is unavailable, but the body is available. - if fromRevision.BodyBytes != nil { + // Delta can't be generated - returning nil forces a full body replication for toRevId. + if fromRevisionForDiff.BodyBytes == nil { + return nil, nil, nil + } + // Need to generate delta and cache it for others. db.dbStats().DeltaSync().DeltaCacheMiss.Add(1) var toRevision DocumentRevision if !base.IsRevTreeID(toRev) { @@ -537,7 +558,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR return nil, nil, ErrMissing } - // If the revision we're generating a delta to is a tombstone, mark it as such and don't bother generating a delta + // If the revision we're generating a delta to is a tombstone, mark it as such and don't bother generating a delta. if deleted { revCacheDelta := newRevCacheDelta([]byte(base.EmptyDocument), fromRev, toRevision, deleted, nil) if fromRevIsCV { @@ -548,25 +569,25 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR return &revCacheDelta, nil, nil } - // We didn't unmarshal fromBody earlier (in case we could get by with just the delta), so need do it now + // We didn't unmarshal fromBody earlier (in case we could get by with just the delta), so need do it now. var fromBodyCopy Body - if err := fromBodyCopy.Unmarshal(fromRevision.BodyBytes); err != nil { + if err := fromBodyCopy.Unmarshal(fromRevisionForDiff.BodyBytes); err != nil { return nil, nil, err } - // We didn't unmarshal toBody earlier (in case we could get by with just the delta), so need do it now + // We didn't unmarshal toBody earlier (in case we could get by with just the delta), so need do it now. var toBodyCopy Body if err := toBodyCopy.Unmarshal(toRevision.BodyBytes); err != nil { return nil, nil, err } // If attachments have changed between these revisions, we'll stamp the metadata into the bodies before diffing - // so that the resulting delta also contains attachment metadata changes - if fromRevision.Attachments != nil { + // so that the resulting delta also contains attachment metadata changes. + if fromRevisionForDiff.Attachments != nil { // the delta library does not handle deltas in non builtin types, // so we need the map[string]interface{} type conversion here - DeleteAttachmentVersion(fromRevision.Attachments) - fromBodyCopy[BodyAttachments] = map[string]interface{}(fromRevision.Attachments) + DeleteAttachmentVersion(fromRevisionForDiff.Attachments) + fromBodyCopy[BodyAttachments] = map[string]interface{}(fromRevisionForDiff.Attachments) } var toRevAttStorageMeta []AttachmentStorageMeta @@ -583,7 +604,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR } revCacheDelta := newRevCacheDelta(deltaBytes, fromRev, toRevision, deleted, toRevAttStorageMeta) - // Write the newly calculated delta back into the cache before returning + // Write the newly calculated delta back into the cache before returning. if fromRevIsCV { db.revisionCache.UpdateDeltaCV(ctx, docID, &fromRevVrs, revCacheDelta) } else { @@ -592,6 +613,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR return &revCacheDelta, nil, nil } + // If both body and delta are not available for fromRevId, the delta can't be generated. return nil, nil, nil } diff --git a/db/database_test.go b/db/database_test.go index aa0a232baa..940cc2c770 100644 --- a/db/database_test.go +++ b/db/database_test.go @@ -980,6 +980,217 @@ func TestDeltaSyncWhenFromRevIsLegacyRevTreeID(t *testing.T) { assert.Equal(t, []byte(`{"bar":[],"quux":"fuzz"}`), delta.DeltaBytes) } +// TestDeltaSyncConcurrentClientCachePopulation tests delta sync behavior when multiple goroutines request the same delta simultaneously. +// Ensures that only one delta is computed and others wait for the cached result. +// More instances of duplicated delta computation will be seen for larger documents since the delta computation takes longer. +func TestDeltaSyncConcurrentClientCachePopulation(t *testing.T) { + if !base.IsEnterpriseEdition() { + t.Skip("Delta sync only supported in EE") + } + + tests := []struct { + name string + docSize int + concurrentClients int + }{ + { + name: "100KBDoc_100Clients", + docSize: 100 * 1024, + concurrentClients: 100, + }, + { + name: "100KBDoc_1000Clients", + docSize: 100 * 1024, + concurrentClients: 1000, + }, + { + name: "100KBDoc_5000Clients", + docSize: 100 * 1024, + concurrentClients: 5000, + }, + { + name: "5MBDoc_10Clients", + docSize: 5 * 1024 * 1024, + concurrentClients: 10, + }, + { + name: "5MBDoc_100Clients", + docSize: 5 * 1024 * 1024, + concurrentClients: 100, + }, + { + name: "5MBDoc_1000Clients", + docSize: 5 * 1024 * 1024, + concurrentClients: 1000, + }, + { + name: "5MBDoc_5000Clients", + docSize: 5 * 1024 * 1024, + concurrentClients: 5000, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + db, ctx := SetupTestDBWithOptions(t, DatabaseContextOptions{DeltaSyncOptions: DeltaSyncOptions{Enabled: true, RevMaxAgeSeconds: 300}}) + defer db.Close(ctx) + collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) + + docID := "doc1_" + test.name + rev1, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "bar": "buzz", "quux": strings.Repeat("a", test.docSize)}) + require.NoError(t, err) + rev2, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "quux": strings.Repeat("b", test.docSize), BodyRev: rev1}) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(test.concurrentClients) + for i := 0; i < test.concurrentClients; i++ { + go func() { + defer wg.Done() + delta, _, err := collection.GetDelta(ctx, docID, rev1, rev2) + require.NoErrorf(t, err, "Error getting delta for doc %q from rev %q to %q", docID, rev1, rev2) + require.NotNil(t, delta) + }() + } + wg.Wait() + + // ensure only 1 delta miss and the remaining used cache + deltaCacheHits := db.DbStats.DeltaSync().DeltaCacheHit.Value() + deltaCacheMisses := db.DbStats.DeltaSync().DeltaCacheMiss.Value() + assert.Equal(t, int64(1), deltaCacheMisses, "Unexpected number of delta cache misses") + assert.Equal(t, int64(test.concurrentClients-1), deltaCacheHits, "Unexpected number of delta cache hits") + }) + } +} + +func BenchmarkDeltaSyncConcurrentClientCachePopulation(b *testing.B) { + if !base.IsEnterpriseEdition() { + b.Skip("Delta sync only supported in EE") + } + + tests := []struct { + name string + docSize int + concurrentClients int + }{ + { + name: "100KBDoc_1Client", + docSize: 100 * 1024, + concurrentClients: 1, + }, + { + name: "100KBDoc_100Clients", + docSize: 100 * 1024, + concurrentClients: 100, + }, + { + name: "100KBDoc_1000Clients", + docSize: 100 * 1024, + concurrentClients: 1000, + }, + { + name: "100KBDoc_5000Clients", + docSize: 100 * 1024, + concurrentClients: 5000, + }, + { + name: "5MBDoc_1Client", + docSize: 5 * 1024 * 1024, + concurrentClients: 1, + }, + { + name: "5MBDoc_10Clients", + docSize: 5 * 1024 * 1024, + concurrentClients: 10, + }, + { + name: "5MBDoc_100Clients", + docSize: 5 * 1024 * 1024, + concurrentClients: 100, + }, + { + name: "5MBDoc_1000Clients", + docSize: 5 * 1024 * 1024, + concurrentClients: 1000, + }, + { + name: "5MBDoc_5000Clients", + docSize: 5 * 1024 * 1024, + concurrentClients: 5000, + }, + } + + for _, test := range tests { + b.Run(test.name, func(b *testing.B) { + db, ctx := SetupTestDBWithOptions(b, DatabaseContextOptions{DeltaSyncOptions: DeltaSyncOptions{Enabled: true, RevMaxAgeSeconds: 300}}) + defer db.Close(ctx) + collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, b, db) + + docID := "doc1_" + test.name + rev1, _, _ := collection.Put(ctx, docID, Body{"foo": "bar", "bar": "buzz", "quux": strings.Repeat("a", test.docSize)}) + rev2, _, _ := collection.Put(ctx, docID, Body{"foo": "bar", "quux": strings.Repeat("b", test.docSize), BodyRev: rev1}) + + for b.Loop() { + wg := sync.WaitGroup{} + wg.Add(test.concurrentClients) + for i := 0; i < test.concurrentClients; i++ { + go func() { + defer wg.Done() + _, _, _ = collection.GetDelta(ctx, docID, rev1, rev2) + }() + } + wg.Wait() + } + }) + } +} + +func BenchmarkDeltaSyncSingleClientCachePopulation(b *testing.B) { + if !base.IsEnterpriseEdition() { + b.Skip("Delta sync only supported in EE") + } + + tests := []struct { + name string + docSize int + }{ + { + name: "100KBDoc", + docSize: 100 * 1024, + }, + //{ + // name: "5MBDoc", + // docSize: 5 * 1024 * 1024, + //}, + } + + for _, test := range tests { + b.Run(test.name, func(b *testing.B) { + db, ctx := SetupTestDBWithOptions(b, DatabaseContextOptions{DeltaSyncOptions: DeltaSyncOptions{Enabled: true, RevMaxAgeSeconds: 300}}) + defer db.Close(ctx) + collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, b, db) + + // run benchmark over 1000 non-cached deltas since repeated invocations of b.Loop would hit the cache + const numDocs = 1000 + benchDocs := make([]struct{ docID, rev1, rev2 string }, 0, numDocs) + for i := range numDocs { + docID := fmt.Sprintf("%s_doc_%d", test.name, i) + rev1, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "bar": "buzz", "quux": strings.Repeat("a", test.docSize)}) + require.NoError(b, err) + rev2, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "quux": strings.Repeat("b", test.docSize), BodyRev: rev1}) + require.NoError(b, err) + benchDocs = append(benchDocs, struct{ docID, rev1, rev2 string }{docID: docID, rev1: rev1, rev2: rev2}) + } + + for b.Loop() { + for _, doc := range benchDocs { + _, _, _ = collection.GetDelta(ctx, doc.docID, doc.rev1, doc.rev2) + } + } + }) + } +} + // Test delta sync behavior when the fromRevision is a channel removal. func TestDeltaSyncWhenFromRevIsChannelRemoval(t *testing.T) { if !base.IsEnterpriseEdition() { diff --git a/db/revision_cache_interface.go b/db/revision_cache_interface.go index b32f838ec2..a669cc5e82 100644 --- a/db/revision_cache_interface.go +++ b/db/revision_cache_interface.go @@ -12,6 +12,7 @@ package db import ( "context" + "sync" "time" "github.com/couchbase/sync_gateway/base" @@ -206,17 +207,18 @@ type DocumentRevision struct { DocID string RevID string // BodyBytes contains the raw document, with no special properties. - BodyBytes []byte - History Revisions - Channels base.Set - Expiry *time.Time - Attachments AttachmentsMeta - Delta *RevisionDelta - Deleted bool - Removed bool // True if the revision is a removal. - MemoryBytes int64 // storage of the doc rev bytes measurement, includes size of delta when present too - CV *Version - HlvHistory string + BodyBytes []byte + History Revisions + Channels base.Set + Expiry *time.Time + Attachments AttachmentsMeta + Delta *RevisionDelta + RevCacheValueDeltaLock *sync.Mutex // shared mutex for the revcache value to avoid concurrent delta generation + Deleted bool + Removed bool // True if the revision is a removal. + MemoryBytes int64 // storage of the doc rev bytes measurement, includes size of delta when present too + CV *Version + HlvHistory string } // MutableBody returns a deep copy of the given document revision as a plain body (without any special properties) diff --git a/db/revision_cache_lru.go b/db/revision_cache_lru.go index 61a2bd3487..7c9a77ccf3 100644 --- a/db/revision_cache_lru.go +++ b/db/revision_cache_lru.go @@ -136,6 +136,7 @@ type revCacheValue struct { itemBytes atomic.Int64 collectionID uint32 canEvict atomic.Bool + deltaLock sync.Mutex // synchronizes GetDelta across multiple clients for each fromRevision } // Creates a revision cache with the given capacity and an optional loader function. @@ -762,22 +763,23 @@ func (value *revCacheValue) load(ctx context.Context, backingStore RevisionCache func (value *revCacheValue) asDocumentRevision(delta *RevisionDelta) (DocumentRevision, error) { docRev := DocumentRevision{ - DocID: value.id, - RevID: value.revID, - BodyBytes: value.bodyBytes, - History: value.history, - Channels: value.channels, - Expiry: value.expiry, - Attachments: value.attachments.ShallowCopy(), // Avoid caller mutating the stored attachments - Deleted: value.deleted, - Removed: value.removed, - HlvHistory: value.hlvHistory, + DocID: value.id, + RevID: value.revID, + BodyBytes: value.bodyBytes, + History: value.history, + Channels: value.channels, + Expiry: value.expiry, + Attachments: value.attachments.ShallowCopy(), // Avoid caller mutating the stored attachments + Delta: delta, + Deleted: value.deleted, + Removed: value.removed, + HlvHistory: value.hlvHistory, + RevCacheValueDeltaLock: &value.deltaLock, } // only populate CV if we have a value if !value.cv.IsEmpty() { docRev.CV = &value.cv } - docRev.Delta = delta return docRev, value.err }