Skip to content

Commit 6ed11ec

Browse files
authored
[3.3.2] CBG-5035: Add revcache value lock for synchronized delta generation (#7912)
1 parent 8e78b5a commit 6ed11ec

File tree

4 files changed

+282
-47
lines changed

4 files changed

+282
-47
lines changed

db/crud.go

Lines changed: 47 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -370,53 +370,72 @@ func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid s
370370
return revision, nil
371371
}
372372

373-
// GetDelta attempts to return the delta between fromRevId and toRevId. If the delta can't be generated,
374-
// returns nil.
373+
// GetDelta attempts to return the delta between fromRevId and toRevId. If the delta can't be generated, returns nil.
374+
// Delta generation is synchronized per fromRev via a shared revision cache value lock to avoid multiple clients generating the same delta simultaneously.
375375
func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromRevID, toRevID string) (delta *RevisionDelta, redactedRev *DocumentRevision, err error) {
376-
377376
if docID == "" || fromRevID == "" || toRevID == "" {
378377
return nil, nil, nil
379378
}
380379

381-
fromRevision, err := db.revisionCache.Get(ctx, docID, fromRevID, RevCacheIncludeDelta)
380+
initialFromRevision, err := db.revisionCache.Get(ctx, docID, fromRevID, RevCacheIncludeDelta)
382381

383382
// If the fromRevision is a removal cache entry (no body), but the user has access to that removal, then just
384383
// return 404 missing to indicate that the body of the revision is no longer available.
385384
// Delta can't be generated if we don't have the fromRevision body.
386-
if fromRevision.Removed {
385+
if initialFromRevision.Removed {
387386
return nil, nil, ErrMissing
388387
}
389388

390-
// If the fromRevision was a tombstone, then return error to tell delta sync to send full body replication
391-
if fromRevision.Deleted {
389+
// If the fromRevision was a tombstone, then return error to tell delta sync to send full body replication.
390+
if initialFromRevision.Deleted {
392391
return nil, nil, base.ErrDeltaSourceIsTombstone
393392
}
394393

395394
// If both body and delta are not available for fromRevId, the delta can't be generated
396-
if fromRevision.BodyBytes == nil && fromRevision.Delta == nil {
395+
// Note: In 4.x this is achieved by a returned `err` value on the revisionCache Get call above
396+
if initialFromRevision.BodyBytes == nil && initialFromRevision.Delta == nil {
397397
return nil, nil, err
398398
}
399399

400-
// If delta is found, check whether it is a delta for the toRevID we want
401-
if fromRevision.Delta != nil {
402-
if fromRevision.Delta.ToRevID == toRevID {
400+
// If delta is found, check whether it is a delta for the toRevID we want.
401+
if initialFromRevision.Delta != nil && initialFromRevision.Delta.ToRevID == toRevID {
402+
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, initialFromRevision.Delta.ToChannels, initialFromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, initialFromRevision.Delta.RevisionHistory))
403+
if !isAuthorized {
404+
return nil, &redactedBody, nil
405+
}
406+
db.dbStats().DeltaSync().DeltaCacheHit.Add(1)
407+
return initialFromRevision.Delta, nil, nil
408+
}
409+
410+
if initialFromRevision.BodyBytes != nil {
411+
// Acquire a delta lock to generate delta (ensuring only one toRev unmarshalling/diff for this fromRev and allow racing clients to share the result)
412+
initialFromRevision.RevCacheValueDeltaLock.Lock()
413+
defer initialFromRevision.RevCacheValueDeltaLock.Unlock()
403414

404-
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, fromRevision.Delta.ToChannels, fromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevision.Delta.RevisionHistory))
415+
// fromRevisionForDiff is a version of the fromRevision that is guarded by the delta lock that we will use to generate the delta (or check again for a newly cached delta)
416+
fromRevisionForDiff, err := db.revisionCache.Get(ctx, docID, fromRevID, RevCacheIncludeDelta)
417+
if err != nil {
418+
return nil, nil, err
419+
}
420+
421+
// Check if another writer beat us to generating the delta and caching it.
422+
if fromRevisionForDiff.Delta != nil && fromRevisionForDiff.Delta.ToRevID == toRevID {
423+
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, fromRevisionForDiff.Delta.ToChannels, fromRevisionForDiff.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevisionForDiff.Delta.RevisionHistory))
405424
if !isAuthorized {
406425
return nil, &redactedBody, nil
407426
}
408427

409428
// Case 2a. 'some rev' is the rev we're interested in - return the delta
410-
// db.DbStats.StatsDeltaSync().Add(base.StatKeyDeltaCacheHits, 1)
411429
db.dbStats().DeltaSync().DeltaCacheHit.Add(1)
412-
return fromRevision.Delta, nil, nil
430+
return fromRevisionForDiff.Delta, nil, nil
413431
}
414-
}
415432

416-
// Delta is unavailable, but the body is available.
417-
if fromRevision.BodyBytes != nil {
433+
// Delta can't be generated - returning nil forces a full body replication for toRevId.
434+
if fromRevisionForDiff.BodyBytes == nil {
435+
return nil, nil, nil
436+
}
418437

419-
// db.DbStats.StatsDeltaSync().Add(base.StatKeyDeltaCacheMisses, 1)
438+
// Need to generate delta and cache it for others.
420439
db.dbStats().DeltaSync().DeltaCacheMiss.Add(1)
421440
toRevision, err := db.revisionCache.Get(ctx, docID, toRevID, RevCacheIncludeDelta)
422441
if err != nil {
@@ -433,32 +452,32 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
433452
return nil, nil, ErrMissing
434453
}
435454

436-
// If the revision we're generating a delta to is a tombstone, mark it as such and don't bother generating a delta
455+
// If the revision we're generating a delta to is a tombstone, mark it as such and don't bother generating a delta.
437456
if deleted {
438457
revCacheDelta := newRevCacheDelta([]byte(base.EmptyDocument), fromRevID, toRevision, deleted, nil)
439458
db.revisionCache.UpdateDelta(ctx, docID, fromRevID, revCacheDelta)
440459
return &revCacheDelta, nil, nil
441460
}
442461

443-
// We didn't unmarshal fromBody earlier (in case we could get by with just the delta), so need do it now
462+
// We didn't unmarshal fromBody earlier (in case we could get by with just the delta), so need do it now.
444463
var fromBodyCopy Body
445-
if err := fromBodyCopy.Unmarshal(fromRevision.BodyBytes); err != nil {
464+
if err := fromBodyCopy.Unmarshal(fromRevisionForDiff.BodyBytes); err != nil {
446465
return nil, nil, err
447466
}
448467

449-
// We didn't unmarshal toBody earlier (in case we could get by with just the delta), so need do it now
468+
// We didn't unmarshal toBody earlier (in case we could get by with just the delta), so need do it now.
450469
var toBodyCopy Body
451470
if err := toBodyCopy.Unmarshal(toRevision.BodyBytes); err != nil {
452471
return nil, nil, err
453472
}
454473

455474
// If attachments have changed between these revisions, we'll stamp the metadata into the bodies before diffing
456-
// so that the resulting delta also contains attachment metadata changes
457-
if fromRevision.Attachments != nil {
475+
// so that the resulting delta also contains attachment metadata changes.
476+
if fromRevisionForDiff.Attachments != nil {
458477
// the delta library does not handle deltas in non builtin types,
459478
// so we need the map[string]interface{} type conversion here
460-
DeleteAttachmentVersion(fromRevision.Attachments)
461-
fromBodyCopy[BodyAttachments] = map[string]interface{}(fromRevision.Attachments)
479+
DeleteAttachmentVersion(fromRevisionForDiff.Attachments)
480+
fromBodyCopy[BodyAttachments] = map[string]any(fromRevisionForDiff.Attachments)
462481
}
463482

464483
var toRevAttStorageMeta []AttachmentStorageMeta
@@ -475,11 +494,12 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
475494
}
476495
revCacheDelta := newRevCacheDelta(deltaBytes, fromRevID, toRevision, deleted, toRevAttStorageMeta)
477496

478-
// Write the newly calculated delta back into the cache before returning
497+
// Write the newly calculated delta back into the cache before returning.
479498
db.revisionCache.UpdateDelta(ctx, docID, fromRevID, revCacheDelta)
480499
return &revCacheDelta, nil, nil
481500
}
482501

502+
// If both body and delta are not available for fromRevId, the delta can't be generated.
483503
return nil, nil, nil
484504
}
485505

db/database_test.go

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,217 @@ func TestGetRemovalMultiChannel(t *testing.T) {
541541
require.Equal(t, bodyExpected, body)
542542
}
543543

544+
// TestDeltaSyncConcurrentClientCachePopulation tests delta sync behavior when multiple goroutines request the same delta simultaneously.
545+
// Ensures that only one delta is computed and others wait for the cached result.
546+
// More instances of duplicated delta computation will be seen for larger documents since the delta computation takes longer.
547+
func TestDeltaSyncConcurrentClientCachePopulation(t *testing.T) {
548+
if !base.IsEnterpriseEdition() {
549+
t.Skip("Delta sync only supported in EE")
550+
}
551+
552+
tests := []struct {
553+
name string
554+
docSize int
555+
concurrentClients int
556+
}{
557+
{
558+
name: "100KBDoc_100Clients",
559+
docSize: 100 * 1024,
560+
concurrentClients: 100,
561+
},
562+
{
563+
name: "100KBDoc_1000Clients",
564+
docSize: 100 * 1024,
565+
concurrentClients: 1000,
566+
},
567+
{
568+
name: "100KBDoc_5000Clients",
569+
docSize: 100 * 1024,
570+
concurrentClients: 5000,
571+
},
572+
{
573+
name: "5MBDoc_10Clients",
574+
docSize: 5 * 1024 * 1024,
575+
concurrentClients: 10,
576+
},
577+
{
578+
name: "5MBDoc_100Clients",
579+
docSize: 5 * 1024 * 1024,
580+
concurrentClients: 100,
581+
},
582+
{
583+
name: "5MBDoc_1000Clients",
584+
docSize: 5 * 1024 * 1024,
585+
concurrentClients: 1000,
586+
},
587+
{
588+
name: "5MBDoc_5000Clients",
589+
docSize: 5 * 1024 * 1024,
590+
concurrentClients: 5000,
591+
},
592+
}
593+
594+
for _, test := range tests {
595+
t.Run(test.name, func(t *testing.T) {
596+
db, ctx := SetupTestDBWithOptions(t, DatabaseContextOptions{DeltaSyncOptions: DeltaSyncOptions{Enabled: true, RevMaxAgeSeconds: 300}})
597+
defer db.Close(ctx)
598+
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
599+
600+
docID := "doc1_" + test.name
601+
rev1, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "bar": "buzz", "quux": strings.Repeat("a", test.docSize)})
602+
require.NoError(t, err)
603+
rev2, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "quux": strings.Repeat("b", test.docSize), BodyRev: rev1})
604+
require.NoError(t, err)
605+
606+
wg := sync.WaitGroup{}
607+
wg.Add(test.concurrentClients)
608+
for i := 0; i < test.concurrentClients; i++ {
609+
go func() {
610+
defer wg.Done()
611+
delta, _, err := collection.GetDelta(ctx, docID, rev1, rev2)
612+
require.NoErrorf(t, err, "Error getting delta for doc %q from rev %q to %q", docID, rev1, rev2)
613+
require.NotNil(t, delta)
614+
}()
615+
}
616+
wg.Wait()
617+
618+
// ensure only 1 delta miss and the remaining used cache
619+
deltaCacheHits := db.DbStats.DeltaSync().DeltaCacheHit.Value()
620+
deltaCacheMisses := db.DbStats.DeltaSync().DeltaCacheMiss.Value()
621+
assert.Equal(t, int64(1), deltaCacheMisses, "Unexpected number of delta cache misses")
622+
assert.Equal(t, int64(test.concurrentClients-1), deltaCacheHits, "Unexpected number of delta cache hits")
623+
})
624+
}
625+
}
626+
627+
func BenchmarkDeltaSyncConcurrentClientCachePopulation(b *testing.B) {
628+
if !base.IsEnterpriseEdition() {
629+
b.Skip("Delta sync only supported in EE")
630+
}
631+
632+
tests := []struct {
633+
name string
634+
docSize int
635+
concurrentClients int
636+
}{
637+
{
638+
name: "100KBDoc_1Client",
639+
docSize: 100 * 1024,
640+
concurrentClients: 1,
641+
},
642+
{
643+
name: "100KBDoc_100Clients",
644+
docSize: 100 * 1024,
645+
concurrentClients: 100,
646+
},
647+
{
648+
name: "100KBDoc_1000Clients",
649+
docSize: 100 * 1024,
650+
concurrentClients: 1000,
651+
},
652+
{
653+
name: "100KBDoc_5000Clients",
654+
docSize: 100 * 1024,
655+
concurrentClients: 5000,
656+
},
657+
{
658+
name: "5MBDoc_1Client",
659+
docSize: 5 * 1024 * 1024,
660+
concurrentClients: 1,
661+
},
662+
{
663+
name: "5MBDoc_10Clients",
664+
docSize: 5 * 1024 * 1024,
665+
concurrentClients: 10,
666+
},
667+
{
668+
name: "5MBDoc_100Clients",
669+
docSize: 5 * 1024 * 1024,
670+
concurrentClients: 100,
671+
},
672+
{
673+
name: "5MBDoc_1000Clients",
674+
docSize: 5 * 1024 * 1024,
675+
concurrentClients: 1000,
676+
},
677+
{
678+
name: "5MBDoc_5000Clients",
679+
docSize: 5 * 1024 * 1024,
680+
concurrentClients: 5000,
681+
},
682+
}
683+
684+
for _, test := range tests {
685+
b.Run(test.name, func(b *testing.B) {
686+
db, ctx := SetupTestDBWithOptions(b, DatabaseContextOptions{DeltaSyncOptions: DeltaSyncOptions{Enabled: true, RevMaxAgeSeconds: 300}})
687+
defer db.Close(ctx)
688+
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, b, db)
689+
690+
docID := "doc1_" + test.name
691+
rev1, _, _ := collection.Put(ctx, docID, Body{"foo": "bar", "bar": "buzz", "quux": strings.Repeat("a", test.docSize)})
692+
rev2, _, _ := collection.Put(ctx, docID, Body{"foo": "bar", "quux": strings.Repeat("b", test.docSize), BodyRev: rev1})
693+
694+
for b.Loop() {
695+
wg := sync.WaitGroup{}
696+
wg.Add(test.concurrentClients)
697+
for i := 0; i < test.concurrentClients; i++ {
698+
go func() {
699+
defer wg.Done()
700+
_, _, _ = collection.GetDelta(ctx, docID, rev1, rev2)
701+
}()
702+
}
703+
wg.Wait()
704+
}
705+
})
706+
}
707+
}
708+
709+
func BenchmarkDeltaSyncSingleClientCachePopulation(b *testing.B) {
710+
if !base.IsEnterpriseEdition() {
711+
b.Skip("Delta sync only supported in EE")
712+
}
713+
714+
tests := []struct {
715+
name string
716+
docSize int
717+
}{
718+
{
719+
name: "100KBDoc",
720+
docSize: 100 * 1024,
721+
},
722+
//{
723+
// name: "5MBDoc",
724+
// docSize: 5 * 1024 * 1024,
725+
//},
726+
}
727+
728+
for _, test := range tests {
729+
b.Run(test.name, func(b *testing.B) {
730+
db, ctx := SetupTestDBWithOptions(b, DatabaseContextOptions{DeltaSyncOptions: DeltaSyncOptions{Enabled: true, RevMaxAgeSeconds: 300}})
731+
defer db.Close(ctx)
732+
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, b, db)
733+
734+
// run benchmark over 1000 non-cached deltas since repeated invocations of b.Loop would hit the cache
735+
const numDocs = 1000
736+
benchDocs := make([]struct{ docID, rev1, rev2 string }, 0, numDocs)
737+
for i := range numDocs {
738+
docID := fmt.Sprintf("%s_doc_%d", test.name, i)
739+
rev1, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "bar": "buzz", "quux": strings.Repeat("a", test.docSize)})
740+
require.NoError(b, err)
741+
rev2, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "quux": strings.Repeat("b", test.docSize), BodyRev: rev1})
742+
require.NoError(b, err)
743+
benchDocs = append(benchDocs, struct{ docID, rev1, rev2 string }{docID: docID, rev1: rev1, rev2: rev2})
744+
}
745+
746+
for b.Loop() {
747+
for _, doc := range benchDocs {
748+
_, _, _ = collection.GetDelta(ctx, doc.docID, doc.rev1, doc.rev2)
749+
}
750+
}
751+
})
752+
}
753+
}
754+
544755
// Test delta sync behavior when the fromRevision is a channel removal.
545756
func TestDeltaSyncWhenFromRevIsChannelRemoval(t *testing.T) {
546757
db, ctx := setupTestDB(t)

db/revision_cache_interface.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ package db
1212

1313
import (
1414
"context"
15+
"sync"
1516
"time"
1617

1718
"github.com/couchbase/sync_gateway/base"
@@ -164,15 +165,16 @@ type DocumentRevision struct {
164165
DocID string
165166
RevID string
166167
// BodyBytes contains the raw document, with no special properties.
167-
BodyBytes []byte
168-
History Revisions
169-
Channels base.Set
170-
Expiry *time.Time
171-
Attachments AttachmentsMeta
172-
Delta *RevisionDelta
173-
Deleted bool
174-
Removed bool // True if the revision is a removal.
175-
MemoryBytes int64 // storage of the doc rev bytes measurement, includes size of delta when present too
168+
BodyBytes []byte
169+
History Revisions
170+
Channels base.Set
171+
Expiry *time.Time
172+
Attachments AttachmentsMeta
173+
Delta *RevisionDelta
174+
RevCacheValueDeltaLock *sync.Mutex // shared mutex for the revcache value to avoid concurrent delta generation
175+
Deleted bool
176+
Removed bool // True if the revision is a removal.
177+
MemoryBytes int64 // storage of the doc rev bytes measurement, includes size of delta when present too
176178
}
177179

178180
// MutableBody returns a deep copy of the given document revision as a plain body (without any special properties)

0 commit comments

Comments
 (0)