Skip to content

Commit f175922

Browse files
authored
CBG-5032 Add revcache value lock for synchronized delta generation (#7903)
1 parent d61ca43 commit f175922

File tree

4 files changed

+291
-54
lines changed

4 files changed

+291
-54
lines changed

db/crud.go

Lines changed: 54 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -447,27 +447,27 @@ func (db *DatabaseCollectionWithUser) GetCV(ctx context.Context, docid string, c
447447
return db.documentRevisionForRequest(ctx, docid, revision, nil, cv, maxHistory, nil)
448448
}
449449

450-
// GetDelta attempts to return the delta between fromRevId and toRevId. If the delta can't be generated,
451-
// returns nil.
450+
// GetDelta attempts to return the delta between fromRevId and toRevId. If the delta can't be generated, returns nil.
451+
// Delta generation is synchronized per fromRev via a shared revision cache value lock to avoid multiple clients generating the same delta simultaneously.
452452
func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromRev, toRev string) (delta *RevisionDelta, redactedRev *DocumentRevision, err error) {
453-
454453
if docID == "" || fromRev == "" || toRev == "" {
455454
return nil, nil, nil
456455
}
457-
var fromRevision DocumentRevision
456+
457+
var initialFromRevision DocumentRevision
458458
var fromRevVrs Version
459459
fromRevIsCV := !base.IsRevTreeID(fromRev)
460460
if fromRevIsCV {
461461
fromRevVrs, err = ParseVersion(fromRev)
462462
if err != nil {
463463
return nil, nil, err
464464
}
465-
fromRevision, err = db.revisionCache.GetWithCV(ctx, docID, &fromRevVrs, RevCacheIncludeDelta)
465+
initialFromRevision, err = db.revisionCache.GetWithCV(ctx, docID, &fromRevVrs, RevCacheIncludeDelta)
466466
if err != nil {
467467
return nil, nil, err
468468
}
469469
} else {
470-
fromRevision, err = db.revisionCache.GetWithRev(ctx, docID, fromRev, RevCacheIncludeDelta)
470+
initialFromRevision, err = db.revisionCache.GetWithRev(ctx, docID, fromRev, RevCacheIncludeDelta)
471471
if err != nil {
472472
return nil, nil, err
473473
}
@@ -476,39 +476,60 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
476476
// If the fromRevision is a removal cache entry (no body), but the user has access to that removal, then just
477477
// return 404 missing to indicate that the body of the revision is no longer available.
478478
// Delta can't be generated if we don't have the fromRevision body.
479-
if fromRevision.Removed {
479+
if initialFromRevision.Removed {
480480
return nil, nil, ErrMissing
481481
}
482482

483-
// If the fromRevision was a tombstone, then return error to tell delta sync to send full body replication
484-
if fromRevision.Deleted {
483+
// If the fromRevision was a tombstone, then return error to tell delta sync to send full body replication.
484+
if initialFromRevision.Deleted {
485485
return nil, nil, base.ErrDeltaSourceIsTombstone
486486
}
487487

488-
// If both body and delta are not available for fromRevId, the delta can't be generated
489-
if fromRevision.BodyBytes == nil && fromRevision.Delta == nil {
490-
return nil, nil, err
488+
// If delta is found, check whether it is a delta for the toRevID we want.
489+
if initialFromRevision.Delta != nil && (initialFromRevision.Delta.ToCV == toRev || initialFromRevision.Delta.ToRevID == toRev) {
490+
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRev, initialFromRevision.CV, initialFromRevision.Delta.ToChannels, initialFromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, initialFromRevision.Delta.RevisionHistory))
491+
if !isAuthorized {
492+
return nil, &redactedBody, nil
493+
}
494+
db.dbStats().DeltaSync().DeltaCacheHit.Add(1)
495+
return initialFromRevision.Delta, nil, nil
491496
}
492497

493-
// If delta is found, check whether it is a delta for the toRevID we want
494-
if fromRevision.Delta != nil {
495-
if fromRevision.Delta.ToCV == toRev || fromRevision.Delta.ToRevID == toRev {
498+
if initialFromRevision.BodyBytes != nil {
499+
// Acquire a delta lock to generate delta (ensuring only one toRev unmarshalling/diff for this fromRev and allow racing clients to share the result)
500+
initialFromRevision.RevCacheValueDeltaLock.Lock()
501+
defer initialFromRevision.RevCacheValueDeltaLock.Unlock()
496502

497-
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRev, fromRevision.CV, fromRevision.Delta.ToChannels, fromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevision.Delta.RevisionHistory))
503+
// fromRevisionForDiff is a version of the fromRevision that is guarded by the delta lock that we will use to generate the delta (or check again for a newly cached delta)
504+
var fromRevisionForDiff DocumentRevision
505+
if fromRevIsCV {
506+
fromRevisionForDiff, err = db.revisionCache.GetWithCV(ctx, docID, &fromRevVrs, RevCacheIncludeDelta)
507+
if err != nil {
508+
return nil, nil, err
509+
}
510+
} else {
511+
fromRevisionForDiff, err = db.revisionCache.GetWithRev(ctx, docID, fromRev, RevCacheIncludeDelta)
512+
if err != nil {
513+
return nil, nil, err
514+
}
515+
}
516+
517+
// Check if another writer beat us to generating the delta and caching it.
518+
if fromRevisionForDiff.Delta != nil && (fromRevisionForDiff.Delta.ToCV == toRev || fromRevisionForDiff.Delta.ToRevID == toRev) {
519+
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRev, fromRevisionForDiff.CV, fromRevisionForDiff.Delta.ToChannels, fromRevisionForDiff.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevisionForDiff.Delta.RevisionHistory))
498520
if !isAuthorized {
499521
return nil, &redactedBody, nil
500522
}
501-
502-
// Case 2a. 'some rev' is the rev we're interested in - return the delta
503-
// db.DbStats.StatsDeltaSync().Add(base.StatKeyDeltaCacheHits, 1)
504523
db.dbStats().DeltaSync().DeltaCacheHit.Add(1)
505-
return fromRevision.Delta, nil, nil
524+
return fromRevisionForDiff.Delta, nil, nil
506525
}
507-
}
508526

509-
// Delta is unavailable, but the body is available.
510-
if fromRevision.BodyBytes != nil {
527+
// Delta can't be generated - returning nil forces a full body replication for toRevId.
528+
if fromRevisionForDiff.BodyBytes == nil {
529+
return nil, nil, nil
530+
}
511531

532+
// Need to generate delta and cache it for others.
512533
db.dbStats().DeltaSync().DeltaCacheMiss.Add(1)
513534
var toRevision DocumentRevision
514535
if !base.IsRevTreeID(toRev) {
@@ -537,7 +558,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
537558
return nil, nil, ErrMissing
538559
}
539560

540-
// If the revision we're generating a delta to is a tombstone, mark it as such and don't bother generating a delta
561+
// If the revision we're generating a delta to is a tombstone, mark it as such and don't bother generating a delta.
541562
if deleted {
542563
revCacheDelta := newRevCacheDelta([]byte(base.EmptyDocument), fromRev, toRevision, deleted, nil)
543564
if fromRevIsCV {
@@ -548,25 +569,25 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
548569
return &revCacheDelta, nil, nil
549570
}
550571

551-
// We didn't unmarshal fromBody earlier (in case we could get by with just the delta), so need do it now
572+
// We didn't unmarshal fromBody earlier (in case we could get by with just the delta), so need do it now.
552573
var fromBodyCopy Body
553-
if err := fromBodyCopy.Unmarshal(fromRevision.BodyBytes); err != nil {
574+
if err := fromBodyCopy.Unmarshal(fromRevisionForDiff.BodyBytes); err != nil {
554575
return nil, nil, err
555576
}
556577

557-
// We didn't unmarshal toBody earlier (in case we could get by with just the delta), so need do it now
578+
// We didn't unmarshal toBody earlier (in case we could get by with just the delta), so need do it now.
558579
var toBodyCopy Body
559580
if err := toBodyCopy.Unmarshal(toRevision.BodyBytes); err != nil {
560581
return nil, nil, err
561582
}
562583

563584
// If attachments have changed between these revisions, we'll stamp the metadata into the bodies before diffing
564-
// so that the resulting delta also contains attachment metadata changes
565-
if fromRevision.Attachments != nil {
585+
// so that the resulting delta also contains attachment metadata changes.
586+
if fromRevisionForDiff.Attachments != nil {
566587
// the delta library does not handle deltas in non builtin types,
567588
// so we need the map[string]interface{} type conversion here
568-
DeleteAttachmentVersion(fromRevision.Attachments)
569-
fromBodyCopy[BodyAttachments] = map[string]any(fromRevision.Attachments)
589+
DeleteAttachmentVersion(fromRevisionForDiff.Attachments)
590+
fromBodyCopy[BodyAttachments] = map[string]any(fromRevisionForDiff.Attachments)
570591
}
571592

572593
var toRevAttStorageMeta []AttachmentStorageMeta
@@ -583,7 +604,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
583604
}
584605
revCacheDelta := newRevCacheDelta(deltaBytes, fromRev, toRevision, deleted, toRevAttStorageMeta)
585606

586-
// Write the newly calculated delta back into the cache before returning
607+
// Write the newly calculated delta back into the cache before returning.
587608
if fromRevIsCV {
588609
db.revisionCache.UpdateDeltaCV(ctx, docID, &fromRevVrs, revCacheDelta)
589610
} else {
@@ -592,6 +613,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
592613
return &revCacheDelta, nil, nil
593614
}
594615

616+
// If both body and delta are not available for fromRevId, the delta can't be generated.
595617
return nil, nil, nil
596618
}
597619

db/database_test.go

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -980,6 +980,217 @@ func TestDeltaSyncWhenFromRevIsLegacyRevTreeID(t *testing.T) {
980980
assert.Equal(t, []byte(`{"bar":[],"quux":"fuzz"}`), delta.DeltaBytes)
981981
}
982982

983+
// TestDeltaSyncConcurrentClientCachePopulation tests delta sync behavior when multiple goroutines request the same delta simultaneously.
984+
// Ensures that only one delta is computed and others wait for the cached result.
985+
// More instances of duplicated delta computation will be seen for larger documents since the delta computation takes longer.
986+
func TestDeltaSyncConcurrentClientCachePopulation(t *testing.T) {
987+
if !base.IsEnterpriseEdition() {
988+
t.Skip("Delta sync only supported in EE")
989+
}
990+
991+
tests := []struct {
992+
name string
993+
docSize int
994+
concurrentClients int
995+
}{
996+
{
997+
name: "100KBDoc_100Clients",
998+
docSize: 100 * 1024,
999+
concurrentClients: 100,
1000+
},
1001+
{
1002+
name: "100KBDoc_1000Clients",
1003+
docSize: 100 * 1024,
1004+
concurrentClients: 1000,
1005+
},
1006+
{
1007+
name: "100KBDoc_5000Clients",
1008+
docSize: 100 * 1024,
1009+
concurrentClients: 5000,
1010+
},
1011+
{
1012+
name: "5MBDoc_10Clients",
1013+
docSize: 5 * 1024 * 1024,
1014+
concurrentClients: 10,
1015+
},
1016+
{
1017+
name: "5MBDoc_100Clients",
1018+
docSize: 5 * 1024 * 1024,
1019+
concurrentClients: 100,
1020+
},
1021+
{
1022+
name: "5MBDoc_1000Clients",
1023+
docSize: 5 * 1024 * 1024,
1024+
concurrentClients: 1000,
1025+
},
1026+
{
1027+
name: "5MBDoc_5000Clients",
1028+
docSize: 5 * 1024 * 1024,
1029+
concurrentClients: 5000,
1030+
},
1031+
}
1032+
1033+
for _, test := range tests {
1034+
t.Run(test.name, func(t *testing.T) {
1035+
db, ctx := SetupTestDBWithOptions(t, DatabaseContextOptions{DeltaSyncOptions: DeltaSyncOptions{Enabled: true, RevMaxAgeSeconds: 300}})
1036+
defer db.Close(ctx)
1037+
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
1038+
1039+
docID := "doc1_" + test.name
1040+
rev1, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "bar": "buzz", "quux": strings.Repeat("a", test.docSize)})
1041+
require.NoError(t, err)
1042+
rev2, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "quux": strings.Repeat("b", test.docSize), BodyRev: rev1})
1043+
require.NoError(t, err)
1044+
1045+
wg := sync.WaitGroup{}
1046+
wg.Add(test.concurrentClients)
1047+
for i := 0; i < test.concurrentClients; i++ {
1048+
go func() {
1049+
defer wg.Done()
1050+
delta, _, err := collection.GetDelta(ctx, docID, rev1, rev2)
1051+
require.NoErrorf(t, err, "Error getting delta for doc %q from rev %q to %q", docID, rev1, rev2)
1052+
require.NotNil(t, delta)
1053+
}()
1054+
}
1055+
wg.Wait()
1056+
1057+
// ensure only 1 delta miss and the remaining used cache
1058+
deltaCacheHits := db.DbStats.DeltaSync().DeltaCacheHit.Value()
1059+
deltaCacheMisses := db.DbStats.DeltaSync().DeltaCacheMiss.Value()
1060+
assert.Equal(t, int64(1), deltaCacheMisses, "Unexpected number of delta cache misses")
1061+
assert.Equal(t, int64(test.concurrentClients-1), deltaCacheHits, "Unexpected number of delta cache hits")
1062+
})
1063+
}
1064+
}
1065+
1066+
func BenchmarkDeltaSyncConcurrentClientCachePopulation(b *testing.B) {
1067+
if !base.IsEnterpriseEdition() {
1068+
b.Skip("Delta sync only supported in EE")
1069+
}
1070+
1071+
tests := []struct {
1072+
name string
1073+
docSize int
1074+
concurrentClients int
1075+
}{
1076+
{
1077+
name: "100KBDoc_1Client",
1078+
docSize: 100 * 1024,
1079+
concurrentClients: 1,
1080+
},
1081+
{
1082+
name: "100KBDoc_100Clients",
1083+
docSize: 100 * 1024,
1084+
concurrentClients: 100,
1085+
},
1086+
{
1087+
name: "100KBDoc_1000Clients",
1088+
docSize: 100 * 1024,
1089+
concurrentClients: 1000,
1090+
},
1091+
{
1092+
name: "100KBDoc_5000Clients",
1093+
docSize: 100 * 1024,
1094+
concurrentClients: 5000,
1095+
},
1096+
{
1097+
name: "5MBDoc_1Client",
1098+
docSize: 5 * 1024 * 1024,
1099+
concurrentClients: 1,
1100+
},
1101+
{
1102+
name: "5MBDoc_10Clients",
1103+
docSize: 5 * 1024 * 1024,
1104+
concurrentClients: 10,
1105+
},
1106+
{
1107+
name: "5MBDoc_100Clients",
1108+
docSize: 5 * 1024 * 1024,
1109+
concurrentClients: 100,
1110+
},
1111+
{
1112+
name: "5MBDoc_1000Clients",
1113+
docSize: 5 * 1024 * 1024,
1114+
concurrentClients: 1000,
1115+
},
1116+
{
1117+
name: "5MBDoc_5000Clients",
1118+
docSize: 5 * 1024 * 1024,
1119+
concurrentClients: 5000,
1120+
},
1121+
}
1122+
1123+
for _, test := range tests {
1124+
b.Run(test.name, func(b *testing.B) {
1125+
db, ctx := SetupTestDBWithOptions(b, DatabaseContextOptions{DeltaSyncOptions: DeltaSyncOptions{Enabled: true, RevMaxAgeSeconds: 300}})
1126+
defer db.Close(ctx)
1127+
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, b, db)
1128+
1129+
docID := "doc1_" + test.name
1130+
rev1, _, _ := collection.Put(ctx, docID, Body{"foo": "bar", "bar": "buzz", "quux": strings.Repeat("a", test.docSize)})
1131+
rev2, _, _ := collection.Put(ctx, docID, Body{"foo": "bar", "quux": strings.Repeat("b", test.docSize), BodyRev: rev1})
1132+
1133+
for b.Loop() {
1134+
wg := sync.WaitGroup{}
1135+
wg.Add(test.concurrentClients)
1136+
for i := 0; i < test.concurrentClients; i++ {
1137+
go func() {
1138+
defer wg.Done()
1139+
_, _, _ = collection.GetDelta(ctx, docID, rev1, rev2)
1140+
}()
1141+
}
1142+
wg.Wait()
1143+
}
1144+
})
1145+
}
1146+
}
1147+
1148+
func BenchmarkDeltaSyncSingleClientCachePopulation(b *testing.B) {
1149+
if !base.IsEnterpriseEdition() {
1150+
b.Skip("Delta sync only supported in EE")
1151+
}
1152+
1153+
tests := []struct {
1154+
name string
1155+
docSize int
1156+
}{
1157+
{
1158+
name: "100KBDoc",
1159+
docSize: 100 * 1024,
1160+
},
1161+
//{
1162+
// name: "5MBDoc",
1163+
// docSize: 5 * 1024 * 1024,
1164+
//},
1165+
}
1166+
1167+
for _, test := range tests {
1168+
b.Run(test.name, func(b *testing.B) {
1169+
db, ctx := SetupTestDBWithOptions(b, DatabaseContextOptions{DeltaSyncOptions: DeltaSyncOptions{Enabled: true, RevMaxAgeSeconds: 300}})
1170+
defer db.Close(ctx)
1171+
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, b, db)
1172+
1173+
// run benchmark over 1000 non-cached deltas since repeated invocations of b.Loop would hit the cache
1174+
const numDocs = 1000
1175+
benchDocs := make([]struct{ docID, rev1, rev2 string }, 0, numDocs)
1176+
for i := range numDocs {
1177+
docID := fmt.Sprintf("%s_doc_%d", test.name, i)
1178+
rev1, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "bar": "buzz", "quux": strings.Repeat("a", test.docSize)})
1179+
require.NoError(b, err)
1180+
rev2, _, err := collection.Put(ctx, docID, Body{"foo": "bar", "quux": strings.Repeat("b", test.docSize), BodyRev: rev1})
1181+
require.NoError(b, err)
1182+
benchDocs = append(benchDocs, struct{ docID, rev1, rev2 string }{docID: docID, rev1: rev1, rev2: rev2})
1183+
}
1184+
1185+
for b.Loop() {
1186+
for _, doc := range benchDocs {
1187+
_, _, _ = collection.GetDelta(ctx, doc.docID, doc.rev1, doc.rev2)
1188+
}
1189+
}
1190+
})
1191+
}
1192+
}
1193+
9831194
// Test delta sync behavior when the fromRevision is a channel removal.
9841195
func TestDeltaSyncWhenFromRevIsChannelRemoval(t *testing.T) {
9851196
if !base.IsEnterpriseEdition() {

0 commit comments

Comments
 (0)