Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,16 @@ Additionally, because the amount of data sent to migration-verifier doesn’t ac

- If the server’s memory usage rises after generation 0, try reducing `recheckMaxSizeMB`. This will shrink the queries that the verifier sends, which in turn should reduce the server’s memory usage. (The number of actual queries sent will rise, of course.)

## Time-Series Collections

Because the verifier compares documents by `_id`, it cannot compare logical time-series measurements (i.e., the data that users actually insert). Instead it compares the server’s internal time-series “buckets”. Unfortunately, this makes mismatch details essentially useless with time-series since they will be details about time-series buckets, which users generally don’t see.

It also requires that migrations replicate the raw buckets rather than the logical measurements. This is because a logical migration would cause `_id` mismatches between source & destination buckets. A user application wouldn’t care (since it never sees the buckets’ `_id`s), but verification does.

NB: Given bucket documents’ size, hashed document comparison can be especially useful with time-series.

# Limitations

- The verifier’s iterative process can handle data changes while it is running, until you hit the writesOff endpoint. However, it cannot handle DDL commands. If the verifier receives a DDL change stream event, the verification will fail.
- The verifier’s iterative process can handle data changes while it is running, until you hit the writesOff endpoint. However, it cannot handle DDL commands. If the verifier receives a DDL change stream event from the source, the verification will fail permanently.

- The verifier crashes if it tries to compare time-series collections. The error will include a phrase like “Collection has nil UUID (most probably is a view)” and also mention “timeseries”.
- The verifier cannot verify time-series collections under namespace filtering.
7 changes: 6 additions & 1 deletion internal/verifier/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,12 @@ func (csr *ChangeStreamReader) createChangeStream(
SetMaxAwaitTime(maxChangeStreamAwaitTime)

if csr.clusterInfo.VersionArray[0] >= 6 {
opts = opts.SetCustomPipeline(bson.M{"showExpandedEvents": true})
opts = opts.SetCustomPipeline(
bson.M{
"showSystemEvents": true,
"showExpandedEvents": true,
},
)
}

savedResumeToken, err := csr.loadChangeStreamResumeToken(ctx)
Expand Down
12 changes: 7 additions & 5 deletions internal/verifier/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,13 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
err = verifier.verificationDatabase().Drop(ctx)
if err != nil {
verifier.mux.Unlock()
return err
return errors.Wrap(err, "dropping metadata")
}
} else {
genOpt, err := verifier.readGeneration(ctx)
if err != nil {
verifier.mux.Unlock()
return err
return errors.Wrap(err, "reading generation from metadata")
}

if gen, has := genOpt.Get(); has {
Expand All @@ -221,6 +221,8 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
}
}

verifier.logger.Info().Msg("Starting change streams.")

// Now that we’ve initialized verifier.generation we can
// start the change stream readers.
verifier.initializeChangeStreamReaders()
Expand All @@ -230,7 +232,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
func(ctx context.Context, _ *retry.FuncInfo) error {
err = verifier.AddMetaIndexes(ctx)
if err != nil {
return err
return errors.Wrap(err, "adding metadata indexes")
}

err = verifier.doInMetaTransaction(
Expand Down Expand Up @@ -457,7 +459,7 @@ func (verifier *Verifier) CreateInitialTasksIfNeeded(ctx context.Context) error
}
isPrimary, err := verifier.CreatePrimaryTaskIfNeeded(ctx)
if err != nil {
return err
return errors.Wrap(err, "creating primary task")
}
if !isPrimary {
verifier.logger.Info().Msg("Primary task already existed; skipping setup")
Expand All @@ -466,7 +468,7 @@ func (verifier *Verifier) CreateInitialTasksIfNeeded(ctx context.Context) error
if verifier.verifyAll {
err := verifier.setupAllNamespaceList(ctx)
if err != nil {
return err
return errors.Wrap(err, "creating namespace list")
}
}
for _, src := range verifier.srcNamespaces {
Expand Down
24 changes: 18 additions & 6 deletions internal/verifier/list_namespaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/10gen/migration-verifier/internal/logger"
"github.com/10gen/migration-verifier/internal/util"
"github.com/10gen/migration-verifier/mmongo"
"github.com/10gen/migration-verifier/mslices"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
Expand All @@ -27,8 +28,12 @@ var (
ExcludedSystemCollPrefix = "system."
)

// Lists all the user collections on a cluster. Unlike mongosync, we don't use the internal $listCatalog, since we need to
// work on old versions without that command. This means this does not run with read concern majority.
// ListAllUserNamespaces lists all the user collections on a cluster,
// in addition to time-series “system.buckets.*” collections.
//
// Unlike mongosync, we don't use the internal $listCatalog, since we need to
// work on old versions without that command. Thus, this does *NOT* run with
// majority read concern.
func ListAllUserNamespaces(
ctx context.Context,
logger *logger.Logger,
Expand Down Expand Up @@ -62,10 +67,17 @@ func ListAllUserNamespaces(
for _, dbName := range dbNames {
db := client.Database(dbName)

filter := util.ExcludePrefixesQuery(
"name",
mslices.Of(ExcludedSystemCollPrefix),
)
filter := bson.D{
{"$or", []bson.D{
util.ExcludePrefixesQuery(
"name",
mslices.Of(ExcludedSystemCollPrefix),
),
{
{"$expr", mmongo.StartsWithAgg("$name", timeseriesBucketsPrefix)},
},
}},
}

specifications, err := db.ListCollectionSpecifications(ctx, filter, options.ListCollections().SetNameOnly(true))
if err != nil {
Expand Down
16 changes: 14 additions & 2 deletions internal/verifier/migration_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,8 +822,20 @@ func (verifier *Verifier) compareCollectionSpecifications(
}
}

// Don't compare view data; they have no data of their own.
canCompareData := srcSpec.Type != "view"
canCompareData := false

switch srcSpec.Type {
case "collection":
canCompareData = true
case "view":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would prefer canCompareData being set to false explicitly.

case "timeseries":
if !verifier.verifyAll {
return nil, false, fmt.Errorf("cannot verify time-series collection (%#q) under namespace filtering", srcNs)
}
default:
return nil, false, fmt.Errorf("unrecognized collection type (spec: %+v)", srcSpec)
}

// Do not compare data between capped and uncapped collections because the partitioning is different.
canCompareData = canCompareData && srcSpec.Options.Lookup("capped").Equal(dstSpec.Options.Lookup("capped"))

Expand Down
3 changes: 3 additions & 0 deletions internal/verifier/timeseries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package verifier

const timeseriesBucketsPrefix = "system.buckets."
Loading