From 2ed2ebaaefa78af1c254547f643188c34c64f0ef Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 17 Sep 2025 16:09:06 -0400 Subject: [PATCH 01/23] start --- internal/verifier/list_namespaces.go | 14 ++++++++++---- internal/verifier/migration_verifier.go | 16 ++++++++++++++-- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/internal/verifier/list_namespaces.go b/internal/verifier/list_namespaces.go index 7737c04e..fb01129f 100644 --- a/internal/verifier/list_namespaces.go +++ b/internal/verifier/list_namespaces.go @@ -5,6 +5,7 @@ import ( "github.com/10gen/migration-verifier/internal/logger" "github.com/10gen/migration-verifier/internal/util" + "github.com/10gen/migration-verifier/mmongo" "github.com/10gen/migration-verifier/mslices" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" @@ -62,10 +63,15 @@ func ListAllUserNamespaces( for _, dbName := range dbNames { db := client.Database(dbName) - filter := util.ExcludePrefixesQuery( - "name", - mslices.Of(ExcludedSystemCollPrefix), - ) + filter := bson.D{ + {"$or", []bson.D{ + util.ExcludePrefixesQuery( + "name", + mslices.Of(ExcludedSystemCollPrefix), + ), + mmongo.StartsWithAgg("$name", "system.buckets."), + }}, + } specifications, err := db.ListCollectionSpecifications(ctx, filter, options.ListCollections().SetNameOnly(true)) if err != nil { diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 72bfdc32..fb661b0d 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -822,8 +822,20 @@ func (verifier *Verifier) compareCollectionSpecifications( } } - // Don't compare view data; they have no data of their own. - canCompareData := srcSpec.Type != "view" + var canCompareData bool + + switch srcSpec.Type { + case "collection": + canCompareData = true + case "view": + case "timeseries": + if !verifier.verifyAll { + return nil, false, fmt.Errorf("cannot verify time-series collection (%#q) under namespace filtering", srcNs) + } + default: + return nil, false, fmt.Errorf("unrecognized collection type (spec: %+v)", srcSpec) + } + // Do not compare data between capped and uncapped collections because the partitioning is different. canCompareData = canCompareData && srcSpec.Options.Lookup("capped").Equal(dstSpec.Options.Lookup("capped")) From eaca327a7a55e922690ba982724f1edc3a8487e1 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 17 Sep 2025 16:30:47 -0400 Subject: [PATCH 02/23] err wrap --- internal/verifier/check.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 5907beaa..1f284bb5 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -202,13 +202,13 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh err = verifier.verificationDatabase().Drop(ctx) if err != nil { verifier.mux.Unlock() - return err + return errors.Wrap(err, "dropping metadata") } } else { genOpt, err := verifier.readGeneration(ctx) if err != nil { verifier.mux.Unlock() - return err + return errors.Wrap(err, "reading generation from metadata") } if gen, has := genOpt.Get(); has { @@ -221,6 +221,8 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh } } + verifier.logger.Info().Msg("Starting change streams.") + // Now that we’ve initialized verifier.generation we can // start the change stream readers. verifier.initializeChangeStreamReaders() @@ -230,7 +232,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh func(ctx context.Context, _ *retry.FuncInfo) error { err = verifier.AddMetaIndexes(ctx) if err != nil { - return err + return errors.Wrap(err, "adding metadata indexes") } err = verifier.doInMetaTransaction( From 54c767c7e13e2acf0d16842e768c752441208d1e Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 17 Sep 2025 16:33:03 -0400 Subject: [PATCH 03/23] expr --- internal/verifier/list_namespaces.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/verifier/list_namespaces.go b/internal/verifier/list_namespaces.go index fb01129f..1e16bc4b 100644 --- a/internal/verifier/list_namespaces.go +++ b/internal/verifier/list_namespaces.go @@ -69,7 +69,9 @@ func ListAllUserNamespaces( "name", mslices.Of(ExcludedSystemCollPrefix), ), - mmongo.StartsWithAgg("$name", "system.buckets."), + bson.D{ + {"$expr", mmongo.StartsWithAgg("$name", "system.buckets.")}, + }, }}, } From fd3723cc89beb4800fa1307f0fed03d894dc571c Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 17 Sep 2025 21:00:33 -0400 Subject: [PATCH 04/23] save --- internal/verifier/list_namespaces.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/verifier/list_namespaces.go b/internal/verifier/list_namespaces.go index 1e16bc4b..ab0060f3 100644 --- a/internal/verifier/list_namespaces.go +++ b/internal/verifier/list_namespaces.go @@ -69,7 +69,7 @@ func ListAllUserNamespaces( "name", mslices.Of(ExcludedSystemCollPrefix), ), - bson.D{ + { {"$expr", mmongo.StartsWithAgg("$name", "system.buckets.")}, }, }}, From 221b7a6e75f2c5869e33110f9a9c6e878f6177a0 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 17 Sep 2025 23:37:03 -0400 Subject: [PATCH 05/23] revert & add --- internal/verifier/check.go | 88 +++++++++++++++++++++++-- internal/verifier/list_namespaces.go | 23 +++---- internal/verifier/migration_verifier.go | 1 + 3 files changed, 94 insertions(+), 18 deletions(-) diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 1f284bb5..150a7856 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -14,6 +14,7 @@ import ( "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" + "golang.org/x/exp/slices" ) type GenerationStatus string @@ -459,7 +460,7 @@ func (verifier *Verifier) CreateInitialTasksIfNeeded(ctx context.Context) error } isPrimary, err := verifier.CreatePrimaryTaskIfNeeded(ctx) if err != nil { - return err + return errors.Wrap(err, "creating primary task") } if !isPrimary { verifier.logger.Info().Msg("Primary task already existed; skipping setup") @@ -468,9 +469,15 @@ func (verifier *Verifier) CreateInitialTasksIfNeeded(ctx context.Context) error if verifier.verifyAll { err := verifier.setupAllNamespaceList(ctx) if err != nil { - return err + return errors.Wrap(err, "creating namespace list") } } + + err = verifier.addTimeseriesBucketsToNamespaces(ctx) + if err != nil { + return errors.Wrap(err, "adding timeseries buckets to namespaces") + } + for _, src := range verifier.srcNamespaces { _, err := verifier.InsertCollectionVerificationTask(ctx, src) if err != nil { @@ -480,9 +487,9 @@ func (verifier *Verifier) CreateInitialTasksIfNeeded(ctx context.Context) error src, ) } - } - verifier.gen0PendingCollectionTasks.Store(int32(len(verifier.srcNamespaces))) + verifier.gen0PendingCollectionTasks.Add(1) + } err = verifier.UpdatePrimaryTaskComplete(ctx) if err != nil { @@ -491,6 +498,79 @@ func (verifier *Verifier) CreateInitialTasksIfNeeded(ctx context.Context) error return nil } +func (verifier *Verifier) addTimeseriesBucketsToNamespaces(ctx context.Context) error { + srcTimeseriesNamespaces, err := whichNamespacesAreTimeseries( + ctx, + verifier.srcClient, + mapset.NewSet(verifier.srcNamespaces...), + ) + if err != nil { + return errors.Wrap(err, "fetching timeseries namespaces") + } + + for _, srcNS := range slices.Clone(verifier.srcNamespaces) { + if !srcTimeseriesNamespaces.Contains(srcNS) { + continue + } + + dstNS, ok := verifier.nsMap.GetDstNamespace(srcNS) + if !ok { + return fmt.Errorf("found no dst namespace for %#q", srcNS) + } + + verifier.srcNamespaces = append( + verifier.srcNamespaces, + "system.buckets."+srcNS, + ) + + verifier.dstNamespaces = append( + verifier.dstNamespaces, + "system.buckets."+dstNS, + ) + } + + return nil +} + +func whichNamespacesAreTimeseries( + ctx context.Context, + client *mongo.Client, + namespaces mapset.Set[string], +) (mapset.Set[string], error) { + tsNamespaces := mapset.NewSet[string]() + + dbNamespaces := map[string]mapset.Set[string]{} + + for ns := range namespaces.Iter() { + db, coll := SplitNamespace(ns) + if set, exists := dbNamespaces[db]; exists { + set.Add(coll) + } else { + dbNamespaces[db] = mapset.NewSet(coll) + } + } + + for db, colls := range dbNamespaces { + specs, err := client.Database(db).ListCollectionSpecifications( + ctx, + bson.D{ + {"type", "timeseries"}, + {"name", bson.D{{"$in", colls.ToSlice()}}}, + }, + ) + + if err != nil { + return nil, errors.Wrapf(err, "listing %#q’s timeseries namespaces", db) + } + + for _, spec := range specs { + tsNamespaces.Add(db + "." + spec.Name) + } + } + + return tsNamespaces, nil +} + func FetchFailedAndIncompleteTasks( ctx context.Context, logger *logger.Logger, diff --git a/internal/verifier/list_namespaces.go b/internal/verifier/list_namespaces.go index ab0060f3..98389969 100644 --- a/internal/verifier/list_namespaces.go +++ b/internal/verifier/list_namespaces.go @@ -5,7 +5,6 @@ import ( "github.com/10gen/migration-verifier/internal/logger" "github.com/10gen/migration-verifier/internal/util" - "github.com/10gen/migration-verifier/mmongo" "github.com/10gen/migration-verifier/mslices" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" @@ -28,8 +27,11 @@ var ( ExcludedSystemCollPrefix = "system." ) -// Lists all the user collections on a cluster. Unlike mongosync, we don't use the internal $listCatalog, since we need to -// work on old versions without that command. This means this does not run with read concern majority. +// ListAllUserNamespaces lists all the user collections on a cluster. +// Unlike mongosync, we don't use the internal $listCatalog, since we need to +// work on old versions without that command. +// +// This means this does *NOT* run with read concern majority. func ListAllUserNamespaces( ctx context.Context, logger *logger.Logger, @@ -63,17 +65,10 @@ func ListAllUserNamespaces( for _, dbName := range dbNames { db := client.Database(dbName) - filter := bson.D{ - {"$or", []bson.D{ - util.ExcludePrefixesQuery( - "name", - mslices.Of(ExcludedSystemCollPrefix), - ), - { - {"$expr", mmongo.StartsWithAgg("$name", "system.buckets.")}, - }, - }}, - } + filter := util.ExcludePrefixesQuery( + "name", + mslices.Of(ExcludedSystemCollPrefix), + ) specifications, err := db.ListCollectionSpecifications(ctx, filter, options.ListCollections().SetNameOnly(true)) if err != nil { diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index fb661b0d..79f30fb9 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -1124,6 +1124,7 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection( } task.Status = verificationTaskMetadataMismatch } + if !verifyData { // If the metadata mismatched and we're not checking the actual data, that's a complete failure. if task.Status == verificationTaskMetadataMismatch { From 9fbcd9976c1bd7d4ce4f4f785120cbda1f007eb7 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 17 Sep 2025 23:47:43 -0400 Subject: [PATCH 06/23] flexible --- internal/verifier/check.go | 78 +------------------- internal/verifier/migration_verifier.go | 6 +- internal/verifier/nsmap.go | 26 +++++-- internal/verifier/timeseries.go | 97 +++++++++++++++++++++++++ 4 files changed, 118 insertions(+), 89 deletions(-) create mode 100644 internal/verifier/timeseries.go diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 150a7856..f6af9036 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -14,7 +14,6 @@ import ( "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" - "golang.org/x/exp/slices" ) type GenerationStatus string @@ -487,10 +486,10 @@ func (verifier *Verifier) CreateInitialTasksIfNeeded(ctx context.Context) error src, ) } - - verifier.gen0PendingCollectionTasks.Add(1) } + verifier.gen0PendingCollectionTasks.Store(int32(len(verifier.srcNamespaces))) + err = verifier.UpdatePrimaryTaskComplete(ctx) if err != nil { return err @@ -498,79 +497,6 @@ func (verifier *Verifier) CreateInitialTasksIfNeeded(ctx context.Context) error return nil } -func (verifier *Verifier) addTimeseriesBucketsToNamespaces(ctx context.Context) error { - srcTimeseriesNamespaces, err := whichNamespacesAreTimeseries( - ctx, - verifier.srcClient, - mapset.NewSet(verifier.srcNamespaces...), - ) - if err != nil { - return errors.Wrap(err, "fetching timeseries namespaces") - } - - for _, srcNS := range slices.Clone(verifier.srcNamespaces) { - if !srcTimeseriesNamespaces.Contains(srcNS) { - continue - } - - dstNS, ok := verifier.nsMap.GetDstNamespace(srcNS) - if !ok { - return fmt.Errorf("found no dst namespace for %#q", srcNS) - } - - verifier.srcNamespaces = append( - verifier.srcNamespaces, - "system.buckets."+srcNS, - ) - - verifier.dstNamespaces = append( - verifier.dstNamespaces, - "system.buckets."+dstNS, - ) - } - - return nil -} - -func whichNamespacesAreTimeseries( - ctx context.Context, - client *mongo.Client, - namespaces mapset.Set[string], -) (mapset.Set[string], error) { - tsNamespaces := mapset.NewSet[string]() - - dbNamespaces := map[string]mapset.Set[string]{} - - for ns := range namespaces.Iter() { - db, coll := SplitNamespace(ns) - if set, exists := dbNamespaces[db]; exists { - set.Add(coll) - } else { - dbNamespaces[db] = mapset.NewSet(coll) - } - } - - for db, colls := range dbNamespaces { - specs, err := client.Database(db).ListCollectionSpecifications( - ctx, - bson.D{ - {"type", "timeseries"}, - {"name", bson.D{{"$in", colls.ToSlice()}}}, - }, - ) - - if err != nil { - return nil, errors.Wrapf(err, "listing %#q’s timeseries namespaces", db) - } - - for _, spec := range specs { - tsNamespaces.Add(db + "." + spec.Name) - } - } - - return tsNamespaces, nil -} - func FetchFailedAndIncompleteTasks( ctx context.Context, logger *logger.Logger, diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 79f30fb9..07844579 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -827,11 +827,7 @@ func (verifier *Verifier) compareCollectionSpecifications( switch srcSpec.Type { case "collection": canCompareData = true - case "view": - case "timeseries": - if !verifier.verifyAll { - return nil, false, fmt.Errorf("cannot verify time-series collection (%#q) under namespace filtering", srcNs) - } + case "view", "timeseries": default: return nil, false, fmt.Errorf("unrecognized collection type (spec: %+v)", srcSpec) } diff --git a/internal/verifier/nsmap.go b/internal/verifier/nsmap.go index ebf52777..bb2d16c9 100644 --- a/internal/verifier/nsmap.go +++ b/internal/verifier/nsmap.go @@ -1,5 +1,7 @@ package verifier +import "fmt" + type NSMap struct { srcDstNsMap map[string]string dstSrcNsMap map[string]string @@ -18,15 +20,9 @@ func (nsmap *NSMap) PopulateWithNamespaces(srcNamespaces []string, dstNamespaces } for i, srcNs := range srcNamespaces { - dstNs := dstNamespaces[i] - if _, exist := nsmap.srcDstNsMap[srcNs]; exist { - panic("another mapping already exists for source namespace " + srcNs) - } - if _, exist := nsmap.dstSrcNsMap[dstNs]; exist { - panic("another mapping already exists for destination namespace " + dstNs) + if err := nsmap.Augment(srcNs, dstNamespaces[i]); err != nil { + panic(err.Error()) } - nsmap.srcDstNsMap[srcNs] = dstNs - nsmap.dstSrcNsMap[dstNs] = srcNs } } @@ -38,6 +34,20 @@ func (nsmap *NSMap) Len() int { return len(nsmap.srcDstNsMap) } +func (nsmap *NSMap) Augment(srcNs, dstNs string) error { + if _, exist := nsmap.srcDstNsMap[srcNs]; exist { + return fmt.Errorf("another mapping already exists for source namespace %#q", srcNs) + } + if _, exist := nsmap.dstSrcNsMap[dstNs]; exist { + return fmt.Errorf("another mapping already exists for destination namespace %#q", dstNs) + } + + nsmap.srcDstNsMap[srcNs] = dstNs + nsmap.dstSrcNsMap[dstNs] = srcNs + + return nil +} + func (nsmap *NSMap) GetDstNamespace(srcNamespace string) (string, bool) { ns, ok := nsmap.srcDstNsMap[srcNamespace] return ns, ok diff --git a/internal/verifier/timeseries.go b/internal/verifier/timeseries.go new file mode 100644 index 00000000..7dd48902 --- /dev/null +++ b/internal/verifier/timeseries.go @@ -0,0 +1,97 @@ +package verifier + +import ( + "context" + "fmt" + + mapset "github.com/deckarep/golang-set/v2" + "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "golang.org/x/exp/slices" +) + +func (verifier *Verifier) addTimeseriesBucketsToNamespaces(ctx context.Context) error { + srcTimeseriesNamespaces, err := whichNamespacesAreTimeseries( + ctx, + verifier.srcClient, + mapset.NewSet(verifier.srcNamespaces...), + ) + if err != nil { + return errors.Wrap(err, "fetching timeseries namespaces") + } + + for _, srcNS := range slices.Clone(verifier.srcNamespaces) { + if !srcTimeseriesNamespaces.Contains(srcNS) { + continue + } + + dstNS, ok := verifier.nsMap.GetDstNamespace(srcNS) + if !ok { + return fmt.Errorf("found no dst namespace for %#q", srcNS) + } + + srcBuckets := "system.buckets." + srcNS + dstBuckets := "system.buckets." + dstNS + + verifier.srcNamespaces = append( + verifier.srcNamespaces, + srcBuckets, + ) + + verifier.dstNamespaces = append( + verifier.dstNamespaces, + dstBuckets, + ) + + if err := verifier.nsMap.Augment(srcBuckets, dstBuckets); err != nil { + return errors.Wrapf( + err, + "adding %#q -> %#q to internal namespace map", + srcBuckets, + dstBuckets, + ) + } + } + + return nil +} + +func whichNamespacesAreTimeseries( + ctx context.Context, + client *mongo.Client, + namespaces mapset.Set[string], +) (mapset.Set[string], error) { + tsNamespaces := mapset.NewSet[string]() + + dbNamespaces := map[string]mapset.Set[string]{} + + for ns := range namespaces.Iter() { + db, coll := SplitNamespace(ns) + if set, exists := dbNamespaces[db]; exists { + set.Add(coll) + } else { + dbNamespaces[db] = mapset.NewSet(coll) + } + } + + for db, colls := range dbNamespaces { + specs, err := client.Database(db).ListCollectionSpecifications( + ctx, + bson.D{ + {"type", "timeseries"}, + {"name", bson.D{{"$in", colls.ToSlice()}}}, + }, + ) + + if err != nil { + return nil, errors.Wrapf(err, "listing %#q’s timeseries namespaces", db) + } + + for _, spec := range specs { + tsNamespaces.Add(db + "." + spec.Name) + } + } + + return tsNamespaces, nil +} From 931553afbc3ad52131aab419878ce0d8457a3bf8 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Wed, 17 Sep 2025 23:50:51 -0400 Subject: [PATCH 07/23] mment --- internal/verifier/migration_verifier.go | 1 - internal/verifier/timeseries.go | 2 ++ 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 07844579..a32c80f8 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -1120,7 +1120,6 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection( } task.Status = verificationTaskMetadataMismatch } - if !verifyData { // If the metadata mismatched and we're not checking the actual data, that's a complete failure. if task.Status == verificationTaskMetadataMismatch { diff --git a/internal/verifier/timeseries.go b/internal/verifier/timeseries.go index 7dd48902..292281b6 100644 --- a/internal/verifier/timeseries.go +++ b/internal/verifier/timeseries.go @@ -11,6 +11,8 @@ import ( "golang.org/x/exp/slices" ) +// This method augments the verifier’s in-memory state to track the +// `system.buckets.*` collection for each timeseries collection. func (verifier *Verifier) addTimeseriesBucketsToNamespaces(ctx context.Context) error { srcTimeseriesNamespaces, err := whichNamespacesAreTimeseries( ctx, From 7527fa7d6271b4e702d524e8f7d73a7806a8c1f7 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 18 Sep 2025 00:16:35 -0400 Subject: [PATCH 08/23] try 4.2 m debug --- .github/workflows/all.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/all.yml b/.github/workflows/all.yml index ced7c10d..45f4d6fb 100644 --- a/.github/workflows/all.yml +++ b/.github/workflows/all.yml @@ -98,10 +98,12 @@ jobs: - name: Install m and mtools run: |- { - echo npm install -g m + curl -fsSL https://raw.githubusercontent.com/aheckmann/m/master/bin/m > /usr/local/bin/m && chmod +x /usr/local/bin/m echo pipx install 'mtools[all]' } | parallel + - run: export M_DEBUG=1 + - name: Install MongoDB ${{ matrix.mongodb_versions[0] }} (source) run: yes | m ${{ matrix.mongodb_versions[0] }} && dirname $(readlink $(which mongod)) > .srcpath From d6690a4a00e201368211b40e6dd9ae79f2b76414 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 18 Sep 2025 00:44:34 -0400 Subject: [PATCH 09/23] system tevents --- README.md | 10 +++++++--- internal/verifier/change_stream.go | 3 ++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 51ab40ed..6d3201cc 100644 --- a/README.md +++ b/README.md @@ -357,8 +357,12 @@ Additionally, because the amount of data sent to migration-verifier doesn’t ac - If the server’s memory usage rises after generation 0, try reducing `recheckMaxSizeMB`. This will shrink the queries that the verifier sends, which in turn should reduce the server’s memory usage. (The number of actual queries sent will rise, of course.) -# Limitations +## Time-Series Collections + +Because the verifier compares documents by `_id`, it cannot compare logical time-series measurements (i.e., the data that users actually insert). Instead it compares the server’s internal time-series “buckets”. Unfortunately, this makes mismatch details essentially useless with time-series since they will be details about time-series buckets, which users generally don’t see. -- The verifier’s iterative process can handle data changes while it is running, until you hit the writesOff endpoint. However, it cannot handle DDL commands. If the verifier receives a DDL change stream event, the verification will fail. +NB: Given bucket documents’ size, hashed document comparison can be especially useful with time-series. + +# Limitations -- The verifier crashes if it tries to compare time-series collections. The error will include a phrase like “Collection has nil UUID (most probably is a view)” and also mention “timeseries”. +- The verifier’s iterative process can handle data changes while it is running, until you hit the writesOff endpoint. However, it cannot handle DDL commands. If the verifier receives a DDL change stream event from the source, the verification will fail permanently. diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 6d2ca2b2..c45f6155 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -619,7 +619,8 @@ func (csr *ChangeStreamReader) createChangeStream( ) (*mongo.ChangeStream, mongo.Session, primitive.Timestamp, error) { pipeline := csr.GetChangeStreamFilter() opts := options.ChangeStream(). - SetMaxAwaitTime(maxChangeStreamAwaitTime) + SetMaxAwaitTime(maxChangeStreamAwaitTime). + SetCustom(bson.M{"showSystemEvents": true}) if csr.clusterInfo.VersionArray[0] >= 6 { opts = opts.SetCustomPipeline(bson.M{"showExpandedEvents": true}) From 78285b3c6bdd197e7992ddc86513134ad25136af Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 18 Sep 2025 00:58:57 -0400 Subject: [PATCH 10/23] wrong option --- internal/verifier/change_stream.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index c45f6155..6a8e2d42 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -619,11 +619,15 @@ func (csr *ChangeStreamReader) createChangeStream( ) (*mongo.ChangeStream, mongo.Session, primitive.Timestamp, error) { pipeline := csr.GetChangeStreamFilter() opts := options.ChangeStream(). - SetMaxAwaitTime(maxChangeStreamAwaitTime). - SetCustom(bson.M{"showSystemEvents": true}) + SetMaxAwaitTime(maxChangeStreamAwaitTime) if csr.clusterInfo.VersionArray[0] >= 6 { - opts = opts.SetCustomPipeline(bson.M{"showExpandedEvents": true}) + opts = opts.SetCustomPipeline( + bson.M{ + "showSystemEvents": true, + "showExpandedEvents": true, + }, + ) } savedResumeToken, err := csr.loadChangeStreamResumeToken(ctx) From 416036b672c5feb0abbd1c87e94024889e2bfc62 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 18 Sep 2025 01:06:13 -0400 Subject: [PATCH 11/23] fix nsmap handling --- internal/verifier/timeseries.go | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/internal/verifier/timeseries.go b/internal/verifier/timeseries.go index 292281b6..b4463174 100644 --- a/internal/verifier/timeseries.go +++ b/internal/verifier/timeseries.go @@ -28,9 +28,16 @@ func (verifier *Verifier) addTimeseriesBucketsToNamespaces(ctx context.Context) continue } - dstNS, ok := verifier.nsMap.GetDstNamespace(srcNS) - if !ok { - return fmt.Errorf("found no dst namespace for %#q", srcNS) + var dstNS string + + if verifier.nsMap.Len() == 0 { + dstNS = srcNS + } else { + var ok bool + dstNS, ok = verifier.nsMap.GetDstNamespace(srcNS) + if !ok { + return fmt.Errorf("found no dst namespace for %#q", srcNS) + } } srcBuckets := "system.buckets." + srcNS @@ -46,13 +53,15 @@ func (verifier *Verifier) addTimeseriesBucketsToNamespaces(ctx context.Context) dstBuckets, ) - if err := verifier.nsMap.Augment(srcBuckets, dstBuckets); err != nil { - return errors.Wrapf( - err, - "adding %#q -> %#q to internal namespace map", - srcBuckets, - dstBuckets, - ) + if verifier.nsMap.Len() > 0 { + if err := verifier.nsMap.Augment(srcBuckets, dstBuckets); err != nil { + return errors.Wrapf( + err, + "adding %#q -> %#q to internal namespace map", + srcBuckets, + dstBuckets, + ) + } } } From 50af9e0f97768cec67798b2daae081f01c4932ae Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 18 Sep 2025 01:07:46 -0400 Subject: [PATCH 12/23] fix ns --- internal/verifier/timeseries.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/internal/verifier/timeseries.go b/internal/verifier/timeseries.go index b4463174..4533e81f 100644 --- a/internal/verifier/timeseries.go +++ b/internal/verifier/timeseries.go @@ -40,8 +40,11 @@ func (verifier *Verifier) addTimeseriesBucketsToNamespaces(ctx context.Context) } } - srcBuckets := "system.buckets." + srcNS - dstBuckets := "system.buckets." + dstNS + srcDB, srcColl := SplitNamespace(srcNS) + srcBuckets := srcDB + ".system.buckets." + srcColl + + dstDB, dstColl := SplitNamespace(dstNS) + dstBuckets := dstDB + ".system.buckets." + dstColl verifier.srcNamespaces = append( verifier.srcNamespaces, From f9504a7c13e0c6ce2f51b0f5698e2856411beff2 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 18 Sep 2025 01:37:51 -0400 Subject: [PATCH 13/23] skip 4.2 in CI --- .github/workflows/all.yml | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/.github/workflows/all.yml b/.github/workflows/all.yml index 45f4d6fb..43067d40 100644 --- a/.github/workflows/all.yml +++ b/.github/workflows/all.yml @@ -14,13 +14,13 @@ jobs: matrix: include: - # Testing fallback when `hello` isn’t implemented - # (but appendOplogNote is). - - mongodb_versions: [ '4.2.5', '6.0' ] - topology: - name: replset - srcConnStr: mongodb://localhost:27020,localhost:27021,localhost:27022 - dstConnStr: mongodb://localhost:27030,localhost:27031,localhost:27032 +# # Testing fallback when `hello` isn’t implemented +# # (but appendOplogNote is). +# - mongodb_versions: [ '4.2.5', '6.0' ] +# topology: +# name: replset +# srcConnStr: mongodb://localhost:27020,localhost:27021,localhost:27022 +# dstConnStr: mongodb://localhost:27030,localhost:27031,localhost:27032 exclude: - mongodb_versions: [ '4.2', '4.2' ] @@ -34,10 +34,10 @@ jobs: # versions are: source, destination mongodb_versions: - - [ '4.2', '4.2' ] - - [ '4.2', '4.4' ] - - [ '4.2', '5.0' ] - - [ '4.2', '6.0' ] +# - [ '4.2', '4.2' ] +# - [ '4.2', '4.4' ] +# - [ '4.2', '5.0' ] +# - [ '4.2', '6.0' ] - [ '4.4', '4.4' ] - [ '4.4', '5.0' ] @@ -98,12 +98,10 @@ jobs: - name: Install m and mtools run: |- { - curl -fsSL https://raw.githubusercontent.com/aheckmann/m/master/bin/m > /usr/local/bin/m && chmod +x /usr/local/bin/m + echo npm install -g m echo pipx install 'mtools[all]' } | parallel - - run: export M_DEBUG=1 - - name: Install MongoDB ${{ matrix.mongodb_versions[0] }} (source) run: yes | m ${{ matrix.mongodb_versions[0] }} && dirname $(readlink $(which mongod)) > .srcpath From 83a91c0b67a0ab57bb3946a16cd107bafb3b576c Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 18 Sep 2025 02:18:41 -0400 Subject: [PATCH 14/23] add test --- internal/verifier/timeseries_test.go | 112 +++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 internal/verifier/timeseries_test.go diff --git a/internal/verifier/timeseries_test.go b/internal/verifier/timeseries_test.go new file mode 100644 index 00000000..065c42c7 --- /dev/null +++ b/internal/verifier/timeseries_test.go @@ -0,0 +1,112 @@ +package verifier + +import ( + "testing" + "time" + + "github.com/10gen/migration-verifier/mslices" + "github.com/stretchr/testify/require" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +func (suite *IntegrationTestSuite) TestTimeSeries_Simple() { + ctx := suite.Context() + + if suite.BuildVerifier().srcClusterInfo.VersionArray[0] < 6 { + suite.T().Skipf("Need a source version with time-series support.") + } + + dbName := suite.DBNameForTest() + now := time.Now() + + for _, client := range mslices.Of(suite.srcMongoClient, suite.dstMongoClient) { + suite.Require().NoError( + client.Database(dbName).CreateCollection( + ctx, + "weather", + options.CreateCollection().SetTimeSeriesOptions( + options.TimeSeries(). + SetTimeField("time"). + SetMetaField("metadata"), + ), + ), + ) + } + + srcDB := suite.srcMongoClient.Database(dbName) + _, err := srcDB.Collection("weather").InsertOne(ctx, bson.D{ + {"time", now}, + {"metadata", 234.0}, + }) + suite.Require().NoError(err, "should insert measurement") + + copyDocs( + suite.T(), + srcDB.Collection("system.buckets.weather"), + suite.dstMongoClient.Database(dbName).Collection("system.buckets.weather"), + ) + + verifier := suite.BuildVerifier() + verifier.SetVerifyAll(true) + + runner := RunVerifierCheck(ctx, suite.T(), verifier) + suite.Require().NoError(runner.AwaitGenerationEnd()) + + verificationStatus, err := verifier.GetVerificationStatus(ctx) + suite.Require().NoError(err) + suite.Assert().Equal( + 0, + verificationStatus.FailedTasks, + "should be no failed tasks", + ) + suite.Assert().Equal( + 3, + verificationStatus.CompletedTasks, + "should be completed: view meta, buckets meta, and buckets docs", + ) + + _, err = srcDB.Collection("weather").InsertOne(ctx, bson.D{ + {"time", now}, + {"metadata", 234.0}, + }) + suite.Require().NoError(err, "should insert measurement (dupe)") + + suite.Assert().Eventually( + func() bool { + suite.Require().NoError(runner.StartNextGeneration()) + suite.Require().NoError(runner.AwaitGenerationEnd()) + + verificationStatus, err := verifier.GetVerificationStatus(ctx) + suite.Require().NoError(err) + + return verificationStatus.FailedTasks > 0 + }, + time.Minute, + 50*time.Millisecond, + "uncopied update on source should trigger failure", + ) +} + +func copyDocs( + t *testing.T, + srcColl, dstColl *mongo.Collection, +) { + ctx := t.Context() + + cursor, err := srcColl.Find(ctx, bson.D{}) + require.NoError(t, err, "should open src cursor") + + inserted := 0 + for cursor.Next(ctx) { + _, err := dstColl.InsertOne(ctx, cursor.Current) + require.NoError(t, err, "should insert on dst") + + inserted++ + } + + require.NoError(t, cursor.Err(), "should iterate src cursor") + + t.Logf("Copied %d docs %#q -> %#q", inserted, FullName(srcColl), FullName(dstColl)) +} From 1a98c9cd7a5b7bc4b80adcb91d7054cc5ae75e55 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 18 Sep 2025 02:25:24 -0400 Subject: [PATCH 15/23] dedupe --- internal/verifier/timeseries_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/verifier/timeseries_test.go b/internal/verifier/timeseries_test.go index 065c42c7..250abb00 100644 --- a/internal/verifier/timeseries_test.go +++ b/internal/verifier/timeseries_test.go @@ -19,6 +19,7 @@ func (suite *IntegrationTestSuite) TestTimeSeries_Simple() { } dbName := suite.DBNameForTest() + collName := "weather" now := time.Now() for _, client := range mslices.Of(suite.srcMongoClient, suite.dstMongoClient) { @@ -36,7 +37,7 @@ func (suite *IntegrationTestSuite) TestTimeSeries_Simple() { } srcDB := suite.srcMongoClient.Database(dbName) - _, err := srcDB.Collection("weather").InsertOne(ctx, bson.D{ + _, err := srcDB.Collection(collName).InsertOne(ctx, bson.D{ {"time", now}, {"metadata", 234.0}, }) @@ -44,8 +45,8 @@ func (suite *IntegrationTestSuite) TestTimeSeries_Simple() { copyDocs( suite.T(), - srcDB.Collection("system.buckets.weather"), - suite.dstMongoClient.Database(dbName).Collection("system.buckets.weather"), + srcDB.Collection("system.buckets."+collName), + suite.dstMongoClient.Database(dbName).Collection("system.buckets."+collName), ) verifier := suite.BuildVerifier() From 37365b710835db72a10ec69cf9a6350b94eb8203 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 18 Sep 2025 09:13:43 -0400 Subject: [PATCH 16/23] Revert to simpler, non-NS-filter-compatible. --- .github/workflows/all.yml | 22 ++--- README.md | 3 + internal/verifier/check.go | 8 +- internal/verifier/list_namespaces.go | 25 ++++-- internal/verifier/migration_verifier.go | 6 +- internal/verifier/nsmap.go | 26 ++---- internal/verifier/timeseries.go | 111 ------------------------ 7 files changed, 45 insertions(+), 156 deletions(-) delete mode 100644 internal/verifier/timeseries.go diff --git a/.github/workflows/all.yml b/.github/workflows/all.yml index 43067d40..ced7c10d 100644 --- a/.github/workflows/all.yml +++ b/.github/workflows/all.yml @@ -14,13 +14,13 @@ jobs: matrix: include: -# # Testing fallback when `hello` isn’t implemented -# # (but appendOplogNote is). -# - mongodb_versions: [ '4.2.5', '6.0' ] -# topology: -# name: replset -# srcConnStr: mongodb://localhost:27020,localhost:27021,localhost:27022 -# dstConnStr: mongodb://localhost:27030,localhost:27031,localhost:27032 + # Testing fallback when `hello` isn’t implemented + # (but appendOplogNote is). + - mongodb_versions: [ '4.2.5', '6.0' ] + topology: + name: replset + srcConnStr: mongodb://localhost:27020,localhost:27021,localhost:27022 + dstConnStr: mongodb://localhost:27030,localhost:27031,localhost:27032 exclude: - mongodb_versions: [ '4.2', '4.2' ] @@ -34,10 +34,10 @@ jobs: # versions are: source, destination mongodb_versions: -# - [ '4.2', '4.2' ] -# - [ '4.2', '4.4' ] -# - [ '4.2', '5.0' ] -# - [ '4.2', '6.0' ] + - [ '4.2', '4.2' ] + - [ '4.2', '4.4' ] + - [ '4.2', '5.0' ] + - [ '4.2', '6.0' ] - [ '4.4', '4.4' ] - [ '4.4', '5.0' ] diff --git a/README.md b/README.md index 6d3201cc..d0fee227 100644 --- a/README.md +++ b/README.md @@ -366,3 +366,6 @@ NB: Given bucket documents’ size, hashed document comparison can be especially # Limitations - The verifier’s iterative process can handle data changes while it is running, until you hit the writesOff endpoint. However, it cannot handle DDL commands. If the verifier receives a DDL change stream event from the source, the verification will fail permanently. + +- The verifier cannot verify time-series collections under namespace + filtering. diff --git a/internal/verifier/check.go b/internal/verifier/check.go index f6af9036..0bee5472 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -459,7 +459,7 @@ func (verifier *Verifier) CreateInitialTasksIfNeeded(ctx context.Context) error } isPrimary, err := verifier.CreatePrimaryTaskIfNeeded(ctx) if err != nil { - return errors.Wrap(err, "creating primary task") + return return errors.Wrap(err, "creating primary task") } if !isPrimary { verifier.logger.Info().Msg("Primary task already existed; skipping setup") @@ -471,12 +471,6 @@ func (verifier *Verifier) CreateInitialTasksIfNeeded(ctx context.Context) error return errors.Wrap(err, "creating namespace list") } } - - err = verifier.addTimeseriesBucketsToNamespaces(ctx) - if err != nil { - return errors.Wrap(err, "adding timeseries buckets to namespaces") - } - for _, src := range verifier.srcNamespaces { _, err := verifier.InsertCollectionVerificationTask(ctx, src) if err != nil { diff --git a/internal/verifier/list_namespaces.go b/internal/verifier/list_namespaces.go index 98389969..9437d661 100644 --- a/internal/verifier/list_namespaces.go +++ b/internal/verifier/list_namespaces.go @@ -5,6 +5,7 @@ import ( "github.com/10gen/migration-verifier/internal/logger" "github.com/10gen/migration-verifier/internal/util" + "github.com/10gen/migration-verifier/mmongo" "github.com/10gen/migration-verifier/mslices" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" @@ -27,11 +28,12 @@ var ( ExcludedSystemCollPrefix = "system." ) -// ListAllUserNamespaces lists all the user collections on a cluster. -// Unlike mongosync, we don't use the internal $listCatalog, since we need to -// work on old versions without that command. +// ListAllUserNamespaces lists all the user collections on a cluster, +// in addition to time-series “system.buckets.*” collections. // -// This means this does *NOT* run with read concern majority. +// Unlike mongosync, we don't use the internal $listCatalog, since we need to +// work on old versions without that command. Thus, this does *NOT* run with +// majority read concern. func ListAllUserNamespaces( ctx context.Context, logger *logger.Logger, @@ -65,10 +67,17 @@ func ListAllUserNamespaces( for _, dbName := range dbNames { db := client.Database(dbName) - filter := util.ExcludePrefixesQuery( - "name", - mslices.Of(ExcludedSystemCollPrefix), - ) + filter := bson.D{ + {"$or", []bson.D{ + util.ExcludePrefixesQuery( + "name", + mslices.Of(ExcludedSystemCollPrefix), + ), + { + {"$expr", mmongo.StartsWithAgg("$name", timeseriesBucketsPrefix)}, + }, + }}, + } specifications, err := db.ListCollectionSpecifications(ctx, filter, options.ListCollections().SetNameOnly(true)) if err != nil { diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index a32c80f8..fb661b0d 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -827,7 +827,11 @@ func (verifier *Verifier) compareCollectionSpecifications( switch srcSpec.Type { case "collection": canCompareData = true - case "view", "timeseries": + case "view": + case "timeseries": + if !verifier.verifyAll { + return nil, false, fmt.Errorf("cannot verify time-series collection (%#q) under namespace filtering", srcNs) + } default: return nil, false, fmt.Errorf("unrecognized collection type (spec: %+v)", srcSpec) } diff --git a/internal/verifier/nsmap.go b/internal/verifier/nsmap.go index bb2d16c9..ebf52777 100644 --- a/internal/verifier/nsmap.go +++ b/internal/verifier/nsmap.go @@ -1,7 +1,5 @@ package verifier -import "fmt" - type NSMap struct { srcDstNsMap map[string]string dstSrcNsMap map[string]string @@ -20,9 +18,15 @@ func (nsmap *NSMap) PopulateWithNamespaces(srcNamespaces []string, dstNamespaces } for i, srcNs := range srcNamespaces { - if err := nsmap.Augment(srcNs, dstNamespaces[i]); err != nil { - panic(err.Error()) + dstNs := dstNamespaces[i] + if _, exist := nsmap.srcDstNsMap[srcNs]; exist { + panic("another mapping already exists for source namespace " + srcNs) + } + if _, exist := nsmap.dstSrcNsMap[dstNs]; exist { + panic("another mapping already exists for destination namespace " + dstNs) } + nsmap.srcDstNsMap[srcNs] = dstNs + nsmap.dstSrcNsMap[dstNs] = srcNs } } @@ -34,20 +38,6 @@ func (nsmap *NSMap) Len() int { return len(nsmap.srcDstNsMap) } -func (nsmap *NSMap) Augment(srcNs, dstNs string) error { - if _, exist := nsmap.srcDstNsMap[srcNs]; exist { - return fmt.Errorf("another mapping already exists for source namespace %#q", srcNs) - } - if _, exist := nsmap.dstSrcNsMap[dstNs]; exist { - return fmt.Errorf("another mapping already exists for destination namespace %#q", dstNs) - } - - nsmap.srcDstNsMap[srcNs] = dstNs - nsmap.dstSrcNsMap[dstNs] = srcNs - - return nil -} - func (nsmap *NSMap) GetDstNamespace(srcNamespace string) (string, bool) { ns, ok := nsmap.srcDstNsMap[srcNamespace] return ns, ok diff --git a/internal/verifier/timeseries.go b/internal/verifier/timeseries.go deleted file mode 100644 index 4533e81f..00000000 --- a/internal/verifier/timeseries.go +++ /dev/null @@ -1,111 +0,0 @@ -package verifier - -import ( - "context" - "fmt" - - mapset "github.com/deckarep/golang-set/v2" - "github.com/pkg/errors" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "golang.org/x/exp/slices" -) - -// This method augments the verifier’s in-memory state to track the -// `system.buckets.*` collection for each timeseries collection. -func (verifier *Verifier) addTimeseriesBucketsToNamespaces(ctx context.Context) error { - srcTimeseriesNamespaces, err := whichNamespacesAreTimeseries( - ctx, - verifier.srcClient, - mapset.NewSet(verifier.srcNamespaces...), - ) - if err != nil { - return errors.Wrap(err, "fetching timeseries namespaces") - } - - for _, srcNS := range slices.Clone(verifier.srcNamespaces) { - if !srcTimeseriesNamespaces.Contains(srcNS) { - continue - } - - var dstNS string - - if verifier.nsMap.Len() == 0 { - dstNS = srcNS - } else { - var ok bool - dstNS, ok = verifier.nsMap.GetDstNamespace(srcNS) - if !ok { - return fmt.Errorf("found no dst namespace for %#q", srcNS) - } - } - - srcDB, srcColl := SplitNamespace(srcNS) - srcBuckets := srcDB + ".system.buckets." + srcColl - - dstDB, dstColl := SplitNamespace(dstNS) - dstBuckets := dstDB + ".system.buckets." + dstColl - - verifier.srcNamespaces = append( - verifier.srcNamespaces, - srcBuckets, - ) - - verifier.dstNamespaces = append( - verifier.dstNamespaces, - dstBuckets, - ) - - if verifier.nsMap.Len() > 0 { - if err := verifier.nsMap.Augment(srcBuckets, dstBuckets); err != nil { - return errors.Wrapf( - err, - "adding %#q -> %#q to internal namespace map", - srcBuckets, - dstBuckets, - ) - } - } - } - - return nil -} - -func whichNamespacesAreTimeseries( - ctx context.Context, - client *mongo.Client, - namespaces mapset.Set[string], -) (mapset.Set[string], error) { - tsNamespaces := mapset.NewSet[string]() - - dbNamespaces := map[string]mapset.Set[string]{} - - for ns := range namespaces.Iter() { - db, coll := SplitNamespace(ns) - if set, exists := dbNamespaces[db]; exists { - set.Add(coll) - } else { - dbNamespaces[db] = mapset.NewSet(coll) - } - } - - for db, colls := range dbNamespaces { - specs, err := client.Database(db).ListCollectionSpecifications( - ctx, - bson.D{ - {"type", "timeseries"}, - {"name", bson.D{{"$in", colls.ToSlice()}}}, - }, - ) - - if err != nil { - return nil, errors.Wrapf(err, "listing %#q’s timeseries namespaces", db) - } - - for _, spec := range specs { - tsNamespaces.Add(db + "." + spec.Name) - } - } - - return tsNamespaces, nil -} From c2244f78d93e89d27d80a2c8c8ec3fb057b23892 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 18 Sep 2025 09:22:39 -0400 Subject: [PATCH 17/23] fix --- internal/verifier/check.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 0bee5472..7cfd853b 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -459,7 +459,7 @@ func (verifier *Verifier) CreateInitialTasksIfNeeded(ctx context.Context) error } isPrimary, err := verifier.CreatePrimaryTaskIfNeeded(ctx) if err != nil { - return return errors.Wrap(err, "creating primary task") + return errors.Wrap(err, "creating primary task") } if !isPrimary { verifier.logger.Info().Msg("Primary task already existed; skipping setup") From 406dd17aa3b005216c94f0b3617f704ff9abedd4 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 18 Sep 2025 09:34:40 -0400 Subject: [PATCH 18/23] prefix --- internal/verifier/timeseries.go | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 internal/verifier/timeseries.go diff --git a/internal/verifier/timeseries.go b/internal/verifier/timeseries.go new file mode 100644 index 00000000..0d08e93f --- /dev/null +++ b/internal/verifier/timeseries.go @@ -0,0 +1,3 @@ +package verifier + +const timeseriesBucketsPrefix = "system.buckets." From 7a2477fd5db18f7152ef0e507d662bde4c6c2fb1 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 18 Sep 2025 16:57:22 -0400 Subject: [PATCH 19/23] normalize index --- internal/verifier/timeseries_test.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/internal/verifier/timeseries_test.go b/internal/verifier/timeseries_test.go index 250abb00..08b43b6f 100644 --- a/internal/verifier/timeseries_test.go +++ b/internal/verifier/timeseries_test.go @@ -34,6 +34,20 @@ func (suite *IntegrationTestSuite) TestTimeSeries_Simple() { ), ), ) + + // v7+ automatically create this: + _, err := client.Database(dbName).Collection("weather").Indexes(). + CreateOne( + ctx, + mongo.IndexModel{ + Keys: bson.D{ + {"metadata", 1}, + {"time", 1}, + }, + }, + ) + suite.Require().NoError(err, "should create index") + } srcDB := suite.srcMongoClient.Database(dbName) @@ -68,6 +82,8 @@ func (suite *IntegrationTestSuite) TestTimeSeries_Simple() { "should be completed: view meta, buckets meta, and buckets docs", ) + suite.T().Logf("verificationStatus") + _, err = srcDB.Collection("weather").InsertOne(ctx, bson.D{ {"time", now}, {"metadata", 234.0}, From ea685f98bf9199826e8124c583df16ef423f6b22 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 18 Sep 2025 19:56:02 -0400 Subject: [PATCH 20/23] more doc updates --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index d0fee227..e1d36848 100644 --- a/README.md +++ b/README.md @@ -361,6 +361,8 @@ Additionally, because the amount of data sent to migration-verifier doesn’t ac Because the verifier compares documents by `_id`, it cannot compare logical time-series measurements (i.e., the data that users actually insert). Instead it compares the server’s internal time-series “buckets”. Unfortunately, this makes mismatch details essentially useless with time-series since they will be details about time-series buckets, which users generally don’t see. +It also requires that migrations replicate the raw buckets rather than the logical measurements. This is because a logical migration would cause `_id` mismatches between source & destination buckets. A user application wouldn’t care (since it never sees the buckets’ `_id`s), but verification does. + NB: Given bucket documents’ size, hashed document comparison can be especially useful with time-series. # Limitations From 08c57b6e90f14422e75e5eae4ed8ec649613fe89 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 19 Sep 2025 10:53:35 -0400 Subject: [PATCH 21/23] add buckets-only test --- internal/verifier/timeseries_test.go | 230 ++++++++++++++++++++++++++- 1 file changed, 227 insertions(+), 3 deletions(-) diff --git a/internal/verifier/timeseries_test.go b/internal/verifier/timeseries_test.go index 08b43b6f..cbdcb49b 100644 --- a/internal/verifier/timeseries_test.go +++ b/internal/verifier/timeseries_test.go @@ -5,12 +5,237 @@ import ( "time" "github.com/10gen/migration-verifier/mslices" + "github.com/samber/lo" "github.com/stretchr/testify/require" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) +// TestTimeSeries_BucketsOnly confirms the verifier’s time-series coverage +// when only the buckets exist. This is important when verifying shard-to-shard. +func (suite *IntegrationTestSuite) TestTimeSeries_BucketsOnly() { + ctx := suite.Context() + + if suite.BuildVerifier().srcClusterInfo.VersionArray[0] < 6 { + suite.T().Skipf("Need a source version with time-series support.") + } + + dbName := suite.DBNameForTest() + db := suite.srcMongoClient.Database(dbName) + collName := "weather" + bucketsCollName := timeseriesBucketsPrefix + collName + + suite.Require().NoError( + db.CreateCollection( + ctx, + collName, + options.CreateCollection().SetTimeSeriesOptions( + options.TimeSeries(). + SetTimeField("time"). + SetMetaField("sensor"), + ), + ), + ) + + coll := db.Collection(collName) + _, err := coll.InsertOne(ctx, bson.D{ + {"time", time.Now()}, + {"sensor", 1}, + }) + suite.Require().NoError(err, "should insert first measurement") + + _, err = coll.InsertOne(ctx, bson.D{ + {"time", time.Now()}, + {"sensor", 2}, + }) + suite.Require().NoError(err, "should insert second measurement") + + cursor, err := coll.Database().Collection(bucketsCollName). + Find(ctx, bson.D{}) + suite.Require().NoError(err, "should count buckets") + + var buckets []bson.D + suite.Require().NoError(cursor.All(ctx, &buckets)) + suite.Require().Greater(len(buckets), 1, "we need >=1 bucket") + + suite.Require().NoError(coll.Drop(ctx)) + + runVerifier := func() (*Verifier, *CheckRunner, *VerificationStatus) { + verifier := suite.BuildVerifier() + verifier.SetVerifyAll(true) + runner := RunVerifierCheck(ctx, suite.T(), verifier) + suite.Require().NoError(runner.AwaitGenerationEnd()) + + verificationStatus, err := verifier.GetVerificationStatus(ctx) + suite.Require().NoError(err) + + return verifier, runner, verificationStatus + } + + dstDB := suite.dstMongoClient.Database(dbName) + + suite.Run( + "generation 0", + func() { + // The server forbids creation of a buckets collection without the + // relevant time-series options. + suite.Require().NoError( + db.CreateCollection( + ctx, + bucketsCollName, + options.CreateCollection().SetTimeSeriesOptions( + options.TimeSeries(). + SetTimeField("time"). + SetMetaField("sensor"), + ), + ), + "should create source buckets collection", + ) + + _, err := db.Collection(bucketsCollName).InsertMany( + ctx, + lo.ToAnySlice(buckets), + ) + suite.Require().NoError(err, "should insert source buckets") + + verifier, runner, verificationStatus := runVerifier() + suite.Require().NoError(verifier.WritesOff(ctx)) + suite.Require().NoError(runner.Await()) + suite.Require().NoError(verifier.verificationDatabase().Drop(ctx)) + + suite.Assert().NotZero( + verificationStatus.FailedTasks, + "missing buckets collection should show mismatch (status: %+v)", + verificationStatus, + ) + + suite.Require().NoError( + dstDB.CreateCollection( + ctx, + bucketsCollName, + options.CreateCollection().SetTimeSeriesOptions( + options.TimeSeries(). + SetTimeField("time"). + SetMetaField("sensor"), + ), + ), + "should create destination buckets collection", + ) + + _, err = dstDB. + Collection(bucketsCollName). + InsertOne(ctx, buckets[0]) + suite.Require().NoError(err) + + verifier, runner, verificationStatus = runVerifier() + suite.Require().NoError(verifier.WritesOff(ctx)) + suite.Require().NoError(runner.Await()) + suite.Require().NoError(verifier.verificationDatabase().Drop(ctx)) + + suite.Assert().NotZero( + verificationStatus.FailedTasks, + "1 bucket missing should show mismatch", + ) + + _, err = suite.dstMongoClient. + Database(dbName). + Collection(bucketsCollName). + InsertOne(ctx, buckets[1]) + suite.Require().NoError(err) + + verifier, runner, verificationStatus = runVerifier() + suite.Require().NoError(verifier.WritesOff(ctx)) + suite.Require().NoError(runner.Await()) + suite.Require().NoError(verifier.verificationDatabase().Drop(ctx)) + + suite.Assert().Zero( + verificationStatus.FailedTasks, + "if both buckets exist there should be no mismatch", + ) + }, + ) + + suite.Require().NoError(db.Collection(bucketsCollName).Drop(ctx)) + suite.Require().NoError(dstDB.Collection(bucketsCollName).Drop(ctx)) + + suite.Run( + "missing bucket gets added during recheck", + func() { + suite.Require().NoError( + db.CreateCollection( + ctx, + bucketsCollName, + options.CreateCollection().SetTimeSeriesOptions( + options.TimeSeries(). + SetTimeField("time"). + SetMetaField("sensor"), + ), + ), + "should create source buckets collection", + ) + + _, err := db.Collection(bucketsCollName).InsertMany( + ctx, + lo.ToAnySlice(buckets), + ) + suite.Require().NoError(err) + + suite.Require().NoError( + dstDB.CreateCollection( + ctx, + bucketsCollName, + options.CreateCollection().SetTimeSeriesOptions( + options.TimeSeries(). + SetTimeField("time"). + SetMetaField("sensor"), + ), + ), + "should create destination buckets collection", + ) + + _, err = suite.dstMongoClient. + Database(dbName). + Collection(bucketsCollName). + InsertOne(ctx, buckets[0]) + suite.Require().NoError(err) + + verifier, runner, verificationStatus := runVerifier() + defer func() { + suite.Require().NoError(verifier.WritesOff(ctx)) + suite.Assert().NoError(runner.Await()) + suite.Require().NoError(verifier.verificationDatabase().Drop(ctx)) + }() + + suite.Assert().NotZero( + verificationStatus.FailedTasks, + "1 bucket missing should show mismatch", + ) + + _, err = suite.dstMongoClient. + Database(dbName). + Collection(bucketsCollName). + InsertOne(ctx, buckets[1]) + suite.Require().NoError(err) + + suite.Assert().Eventually( + func() bool { + suite.Require().NoError(runner.StartNextGeneration()) + suite.Require().NoError(runner.AwaitGenerationEnd()) + + verificationStatus, err := verifier.GetVerificationStatus(ctx) + suite.Require().NoError(err) + + return verificationStatus.FailedTasks == 0 + }, + time.Minute, + time.Second, + "eventually verifier should see that buckets now match", + ) + }, + ) +} + func (suite *IntegrationTestSuite) TestTimeSeries_Simple() { ctx := suite.Context() @@ -26,7 +251,7 @@ func (suite *IntegrationTestSuite) TestTimeSeries_Simple() { suite.Require().NoError( client.Database(dbName).CreateCollection( ctx, - "weather", + collName, options.CreateCollection().SetTimeSeriesOptions( options.TimeSeries(). SetTimeField("time"). @@ -35,7 +260,7 @@ func (suite *IntegrationTestSuite) TestTimeSeries_Simple() { ), ) - // v7+ automatically create this: + // v7+ automatically creates this: _, err := client.Database(dbName).Collection("weather").Indexes(). CreateOne( ctx, @@ -47,7 +272,6 @@ func (suite *IntegrationTestSuite) TestTimeSeries_Simple() { }, ) suite.Require().NoError(err, "should create index") - } srcDB := suite.srcMongoClient.Database(dbName) From 5091ec6ae56646a9f2f87ef572627679fbaea3fd Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 19 Sep 2025 10:56:43 -0400 Subject: [PATCH 22/23] tweak README --- README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index e1d36848..9dd3bc37 100644 --- a/README.md +++ b/README.md @@ -369,5 +369,4 @@ NB: Given bucket documents’ size, hashed document comparison can be especially - The verifier’s iterative process can handle data changes while it is running, until you hit the writesOff endpoint. However, it cannot handle DDL commands. If the verifier receives a DDL change stream event from the source, the verification will fail permanently. -- The verifier cannot verify time-series collections under namespace - filtering. +- The verifier cannot verify time-series collections under namespace filtering. From 3742d8138c58315766edb3e7e2bc098e4fe870fe Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Mon, 22 Sep 2025 15:08:46 -0400 Subject: [PATCH 23/23] explicit false --- internal/verifier/migration_verifier.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index fb661b0d..92ebcb6c 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -822,7 +822,7 @@ func (verifier *Verifier) compareCollectionSpecifications( } } - var canCompareData bool + canCompareData := false switch srcSpec.Type { case "collection":