diff --git a/db/crud.go b/db/crud.go index 9ed3319b29..b71e69e5f5 100644 --- a/db/crud.go +++ b/db/crud.go @@ -370,53 +370,72 @@ func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid s return revision, nil } -// GetDelta attempts to return the delta between fromRevId and toRevId. If the delta can't be generated, -// returns nil. +// GetDelta attempts to return the delta between fromRevId and toRevId. If the delta can't be generated, returns nil. +// Delta generation is synchronized per fromRev via a shared revision cache value lock to avoid multiple clients generating the same delta simultaneously. func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromRevID, toRevID string) (delta *RevisionDelta, redactedRev *DocumentRevision, err error) { - if docID == "" || fromRevID == "" || toRevID == "" { return nil, nil, nil } - fromRevision, err := db.revisionCache.Get(ctx, docID, fromRevID, RevCacheIncludeDelta) + initialFromRevision, err := db.revisionCache.Get(ctx, docID, fromRevID, RevCacheIncludeDelta) // If the fromRevision is a removal cache entry (no body), but the user has access to that removal, then just // return 404 missing to indicate that the body of the revision is no longer available. // Delta can't be generated if we don't have the fromRevision body. - if fromRevision.Removed { + if initialFromRevision.Removed { return nil, nil, ErrMissing } - // If the fromRevision was a tombstone, then return error to tell delta sync to send full body replication - if fromRevision.Deleted { + // If the fromRevision was a tombstone, then return error to tell delta sync to send full body replication. + if initialFromRevision.Deleted { return nil, nil, base.ErrDeltaSourceIsTombstone } // If both body and delta are not available for fromRevId, the delta can't be generated - if fromRevision.BodyBytes == nil && fromRevision.Delta == nil { + // Note: In 4.x this is achieved by a returned `err` value on the revisionCache Get call above + if initialFromRevision.BodyBytes == nil && initialFromRevision.Delta == nil { return nil, nil, err } - // If delta is found, check whether it is a delta for the toRevID we want - if fromRevision.Delta != nil { - if fromRevision.Delta.ToRevID == toRevID { + // If delta is found, check whether it is a delta for the toRevID we want. + if initialFromRevision.Delta != nil && initialFromRevision.Delta.ToRevID == toRevID { + isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, initialFromRevision.Delta.ToChannels, initialFromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, initialFromRevision.Delta.RevisionHistory)) + if !isAuthorized { + return nil, &redactedBody, nil + } + db.dbStats().DeltaSync().DeltaCacheHit.Add(1) + return initialFromRevision.Delta, nil, nil + } + + if initialFromRevision.BodyBytes != nil { + // Acquire a delta lock to generate delta (ensuring only one toRev unmarshalling/diff for this fromRev and allow racing clients to share the result) + initialFromRevision.RevCacheValueDeltaLock.Lock() + defer initialFromRevision.RevCacheValueDeltaLock.Unlock() - isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, fromRevision.Delta.ToChannels, fromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevision.Delta.RevisionHistory)) + // fromRevisionForDiff is a version of the fromRevision that is guarded by the delta lock that we will use to generate the delta (or check again for a newly cached delta) + fromRevisionForDiff, err := db.revisionCache.Get(ctx, docID, fromRevID, RevCacheIncludeDelta) + if err != nil { + return nil, nil, err + } + + // Check if another writer beat us to generating the delta and caching it. + if fromRevisionForDiff.Delta != nil && fromRevisionForDiff.Delta.ToRevID == toRevID { + isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, fromRevisionForDiff.Delta.ToChannels, fromRevisionForDiff.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevisionForDiff.Delta.RevisionHistory)) if !isAuthorized { return nil, &redactedBody, nil } // Case 2a. 'some rev' is the rev we're interested in - return the delta - // db.DbStats.StatsDeltaSync().Add(base.StatKeyDeltaCacheHits, 1) db.dbStats().DeltaSync().DeltaCacheHit.Add(1) - return fromRevision.Delta, nil, nil + return fromRevisionForDiff.Delta, nil, nil } - } - // Delta is unavailable, but the body is available. - if fromRevision.BodyBytes != nil { + // Delta can't be generated - returning nil forces a full body replication for toRevId. + if fromRevisionForDiff.BodyBytes == nil { + return nil, nil, nil + } - // db.DbStats.StatsDeltaSync().Add(base.StatKeyDeltaCacheMisses, 1) + // Need to generate delta and cache it for others. db.dbStats().DeltaSync().DeltaCacheMiss.Add(1) toRevision, err := db.revisionCache.Get(ctx, docID, toRevID, RevCacheIncludeDelta) if err != nil { @@ -433,32 +452,32 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR return nil, nil, ErrMissing } - // If the revision we're generating a delta to is a tombstone, mark it as such and don't bother generating a delta + // If the revision we're generating a delta to is a tombstone, mark it as such and don't bother generating a delta. if deleted { revCacheDelta := newRevCacheDelta([]byte(base.EmptyDocument), fromRevID, toRevision, deleted, nil) db.revisionCache.UpdateDelta(ctx, docID, fromRevID, revCacheDelta) return &revCacheDelta, nil, nil } - // We didn't unmarshal fromBody earlier (in case we could get by with just the delta), so need do it now + // We didn't unmarshal fromBody earlier (in case we could get by with just the delta), so need do it now. var fromBodyCopy Body - if err := fromBodyCopy.Unmarshal(fromRevision.BodyBytes); err != nil { + if err := fromBodyCopy.Unmarshal(fromRevisionForDiff.BodyBytes); err != nil { return nil, nil, err } - // We didn't unmarshal toBody earlier (in case we could get by with just the delta), so need do it now + // We didn't unmarshal toBody earlier (in case we could get by with just the delta), so need do it now. var toBodyCopy Body if err := toBodyCopy.Unmarshal(toRevision.BodyBytes); err != nil { return nil, nil, err } // If attachments have changed between these revisions, we'll stamp the metadata into the bodies before diffing - // so that the resulting delta also contains attachment metadata changes - if fromRevision.Attachments != nil { + // so that the resulting delta also contains attachment metadata changes. + if fromRevisionForDiff.Attachments != nil { // the delta library does not handle deltas in non builtin types, // so we need the map[string]interface{} type conversion here - DeleteAttachmentVersion(fromRevision.Attachments) - fromBodyCopy[BodyAttachments] = map[string]interface{}(fromRevision.Attachments) + DeleteAttachmentVersion(fromRevisionForDiff.Attachments) + fromBodyCopy[BodyAttachments] = map[string]any(fromRevisionForDiff.Attachments) } var toRevAttStorageMeta []AttachmentStorageMeta @@ -475,11 +494,12 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR } revCacheDelta := newRevCacheDelta(deltaBytes, fromRevID, toRevision, deleted, toRevAttStorageMeta) - // Write the newly calculated delta back into the cache before returning + // Write the newly calculated delta back into the cache before returning. db.revisionCache.UpdateDelta(ctx, docID, fromRevID, revCacheDelta) return &revCacheDelta, nil, nil } + // If both body and delta are not available for fromRevId, the delta can't be generated. return nil, nil, nil } diff --git a/db/database_test.go b/db/database_test.go index 2507e00e49..54afa675f9 100644 --- a/db/database_test.go +++ b/db/database_test.go @@ -541,6 +541,217 @@ func TestGetRemovalMultiChannel(t *testing.T) { require.Equal(t, bodyExpected, body) } +// TestDeltaSyncConcurrentClientCachePopulation tests delta sync behavior when multiple goroutines request the same delta simultaneously. +// Ensures that only one delta is computed and others wait for the cached result. +// More instances of duplicated delta computation will be seen for larger documents since the delta computation takes longer. +func TestDeltaSyncConcurrentClientCachePopulation(t *testing.T) { + if !base.IsEnterpriseEdition() { + t.Skip("Delta sync only supported in EE") + } + + tests := []struct { + name string + docSize int + concurrentClients int + }{ + { + name: "100KBDoc_100Clients", + docSize: 100 * 1024, + concurrentClients: 100, + }, + { + name: "100KBDoc_1000Clients", + docSize: 100 * 1024, + concurrentClients: 1000, + }, + { + name: "100KBDoc_5000Clients", + docSize: 100 * 1024, + concurrentClients: 5000, + }, + { + name: "5MBDoc_10Clients", + docSize: 5 * 1024 * 1024, + concurrentClients: 10, + }, + { + name: "5MBDoc_100Clients", + docSize: 5 * 1024 * 1024, + concurrentClients: 100, + }, + { + name: "5MBDoc_1000Clients", + docSize: 5 * 1024 * 1024, + concurrentClients: 1000, + }, + { + name: "5MBDoc_5000Clients", + docSize: 5 * 1024 * 1024, + concurrentClients: 5000, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + db, ctx := SetupTestDBWithOptions(t, DatabaseContextOptions{DeltaSyncOptions: DeltaSyncOptions{Enabled: true, RevMaxAgeSeconds: 300}}) + defer db.Close(ctx) + collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) + + docID := "doc1_" + test.name + rev1, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "bar": "buzz", "quux": strings.Repeat("a", test.docSize)}) + require.NoError(t, err) + rev2, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "quux": strings.Repeat("b", test.docSize), BodyRev: rev1}) + require.NoError(t, err) + + wg := sync.WaitGroup{} + wg.Add(test.concurrentClients) + for i := 0; i < test.concurrentClients; i++ { + go func() { + defer wg.Done() + delta, _, err := collection.GetDelta(ctx, docID, rev1, rev2) + require.NoErrorf(t, err, "Error getting delta for doc %q from rev %q to %q", docID, rev1, rev2) + require.NotNil(t, delta) + }() + } + wg.Wait() + + // ensure only 1 delta miss and the remaining used cache + deltaCacheHits := db.DbStats.DeltaSync().DeltaCacheHit.Value() + deltaCacheMisses := db.DbStats.DeltaSync().DeltaCacheMiss.Value() + assert.Equal(t, int64(1), deltaCacheMisses, "Unexpected number of delta cache misses") + assert.Equal(t, int64(test.concurrentClients-1), deltaCacheHits, "Unexpected number of delta cache hits") + }) + } +} + +func BenchmarkDeltaSyncConcurrentClientCachePopulation(b *testing.B) { + if !base.IsEnterpriseEdition() { + b.Skip("Delta sync only supported in EE") + } + + tests := []struct { + name string + docSize int + concurrentClients int + }{ + { + name: "100KBDoc_1Client", + docSize: 100 * 1024, + concurrentClients: 1, + }, + { + name: "100KBDoc_100Clients", + docSize: 100 * 1024, + concurrentClients: 100, + }, + { + name: "100KBDoc_1000Clients", + docSize: 100 * 1024, + concurrentClients: 1000, + }, + { + name: "100KBDoc_5000Clients", + docSize: 100 * 1024, + concurrentClients: 5000, + }, + { + name: "5MBDoc_1Client", + docSize: 5 * 1024 * 1024, + concurrentClients: 1, + }, + { + name: "5MBDoc_10Clients", + docSize: 5 * 1024 * 1024, + concurrentClients: 10, + }, + { + name: "5MBDoc_100Clients", + docSize: 5 * 1024 * 1024, + concurrentClients: 100, + }, + { + name: "5MBDoc_1000Clients", + docSize: 5 * 1024 * 1024, + concurrentClients: 1000, + }, + { + name: "5MBDoc_5000Clients", + docSize: 5 * 1024 * 1024, + concurrentClients: 5000, + }, + } + + for _, test := range tests { + b.Run(test.name, func(b *testing.B) { + db, ctx := SetupTestDBWithOptions(b, DatabaseContextOptions{DeltaSyncOptions: DeltaSyncOptions{Enabled: true, RevMaxAgeSeconds: 300}}) + defer db.Close(ctx) + collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, b, db) + + docID := "doc1_" + test.name + rev1, _, _ := collection.Put(ctx, docID, Body{"foo": "bar", "bar": "buzz", "quux": strings.Repeat("a", test.docSize)}) + rev2, _, _ := collection.Put(ctx, docID, Body{"foo": "bar", "quux": strings.Repeat("b", test.docSize), BodyRev: rev1}) + + for b.Loop() { + wg := sync.WaitGroup{} + wg.Add(test.concurrentClients) + for i := 0; i < test.concurrentClients; i++ { + go func() { + defer wg.Done() + _, _, _ = collection.GetDelta(ctx, docID, rev1, rev2) + }() + } + wg.Wait() + } + }) + } +} + +func BenchmarkDeltaSyncSingleClientCachePopulation(b *testing.B) { + if !base.IsEnterpriseEdition() { + b.Skip("Delta sync only supported in EE") + } + + tests := []struct { + name string + docSize int + }{ + { + name: "100KBDoc", + docSize: 100 * 1024, + }, + //{ + // name: "5MBDoc", + // docSize: 5 * 1024 * 1024, + //}, + } + + for _, test := range tests { + b.Run(test.name, func(b *testing.B) { + db, ctx := SetupTestDBWithOptions(b, DatabaseContextOptions{DeltaSyncOptions: DeltaSyncOptions{Enabled: true, RevMaxAgeSeconds: 300}}) + defer db.Close(ctx) + collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, b, db) + + // run benchmark over 1000 non-cached deltas since repeated invocations of b.Loop would hit the cache + const numDocs = 1000 + benchDocs := make([]struct{ docID, rev1, rev2 string }, 0, numDocs) + for i := range numDocs { + docID := fmt.Sprintf("%s_doc_%d", test.name, i) + rev1, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "bar": "buzz", "quux": strings.Repeat("a", test.docSize)}) + require.NoError(b, err) + rev2, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "quux": strings.Repeat("b", test.docSize), BodyRev: rev1}) + require.NoError(b, err) + benchDocs = append(benchDocs, struct{ docID, rev1, rev2 string }{docID: docID, rev1: rev1, rev2: rev2}) + } + + for b.Loop() { + for _, doc := range benchDocs { + _, _, _ = collection.GetDelta(ctx, doc.docID, doc.rev1, doc.rev2) + } + } + }) + } +} + // Test delta sync behavior when the fromRevision is a channel removal. func TestDeltaSyncWhenFromRevIsChannelRemoval(t *testing.T) { db, ctx := setupTestDB(t) diff --git a/db/revision_cache_interface.go b/db/revision_cache_interface.go index 10fcac166e..8e6c2a4e4c 100644 --- a/db/revision_cache_interface.go +++ b/db/revision_cache_interface.go @@ -12,6 +12,7 @@ package db import ( "context" + "sync" "time" "github.com/couchbase/sync_gateway/base" @@ -164,15 +165,16 @@ type DocumentRevision struct { DocID string RevID string // BodyBytes contains the raw document, with no special properties. - BodyBytes []byte - History Revisions - Channels base.Set - Expiry *time.Time - Attachments AttachmentsMeta - Delta *RevisionDelta - Deleted bool - Removed bool // True if the revision is a removal. - MemoryBytes int64 // storage of the doc rev bytes measurement, includes size of delta when present too + BodyBytes []byte + History Revisions + Channels base.Set + Expiry *time.Time + Attachments AttachmentsMeta + Delta *RevisionDelta + RevCacheValueDeltaLock *sync.Mutex // shared mutex for the revcache value to avoid concurrent delta generation + Deleted bool + Removed bool // True if the revision is a removal. + MemoryBytes int64 // storage of the doc rev bytes measurement, includes size of delta when present too } // MutableBody returns a deep copy of the given document revision as a plain body (without any special properties) diff --git a/db/revision_cache_lru.go b/db/revision_cache_lru.go index 50fb0d494c..6048a74fca 100644 --- a/db/revision_cache_lru.go +++ b/db/revision_cache_lru.go @@ -110,6 +110,7 @@ type revCacheValue struct { deleted bool removed bool itemBytes atomic.Int64 + deltaLock sync.Mutex // synchronizes GetDelta across multiple clients for each fromRevision } // Creates a revision cache with the given capacity and an optional loader function. @@ -410,17 +411,18 @@ func (value *revCacheValue) load(ctx context.Context, backingStore RevisionCache func (value *revCacheValue) asDocumentRevision(delta *RevisionDelta) (DocumentRevision, error) { docRev := DocumentRevision{ - DocID: value.key.DocID, - RevID: value.key.RevID, - BodyBytes: value.bodyBytes, - History: value.history, - Channels: value.channels, - Expiry: value.expiry, - Attachments: value.attachments.ShallowCopy(), // Avoid caller mutating the stored attachments - Deleted: value.deleted, - Removed: value.removed, - } - docRev.Delta = delta + DocID: value.key.DocID, + RevID: value.key.RevID, + BodyBytes: value.bodyBytes, + History: value.history, + Channels: value.channels, + Expiry: value.expiry, + Attachments: value.attachments.ShallowCopy(), // Avoid caller mutating the stored attachments + Deleted: value.deleted, + Removed: value.removed, + RevCacheValueDeltaLock: &value.deltaLock, + Delta: delta, + } return docRev, value.err }