Skip to content

Commit 6f81c0c

Browse files
committed
Persist doc IDs as RawValue rather than any.
This makes the document IDs easier to reason about and sets up further optimizations.
1 parent 31b4830 commit 6f81c0c

File tree

8 files changed

+117
-55
lines changed

8 files changed

+117
-55
lines changed

dockey/agg.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ import (
1616
// the returned key may not always match the change stream’s `documentKey`
1717
// (because the server misreports its own sharding logic).
1818
func ExtractTrueDocKeyAgg(fieldNames []string, docExpr string) bson.D {
19-
assertFieldNameUniqueness(fieldNames)
20-
2119
return bson.D{
2220
{"$arrayToObject", mslices.Of(lo.Map(
2321
fieldNames,

internal/verifier/change_stream.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ var supportedEventOpTypes = mapset.NewSet(
4444
type ParsedEvent struct {
4545
OpType string `bson:"operationType"`
4646
Ns *Namespace `bson:"ns,omitempty"`
47-
DocID any `bson:"_docID,omitempty"`
47+
DocID bson.RawValue `bson:"_docID,omitempty"`
4848
FullDocument bson.Raw `bson:"fullDocument,omitempty"`
4949
FullDocLen option.Option[types.ByteCount] `bson:"_fullDocLen"`
5050
ClusterTime *primitive.Timestamp `bson:"clusterTime,omitEmpty"`
@@ -187,7 +187,7 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch ch
187187

188188
dbNames := make([]string, len(batch.events))
189189
collNames := make([]string, len(batch.events))
190-
docIDs := make([]any, len(batch.events))
190+
docIDs := make([]bson.RawValue, len(batch.events))
191191
dataSizes := make([]int, len(batch.events))
192192

193193
latestTimestamp := primitive.Timestamp{}

internal/verifier/migration_verifier.go

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -455,16 +455,20 @@ func (verifier *Verifier) maybeAppendGlobalFilterToPredicates(predicates bson.A)
455455
return append(predicates, verifier.globalFilter)
456456
}
457457

458-
func mismatchResultsToVerificationResults(mismatch *MismatchDetails, srcClientDoc, dstClientDoc bson.Raw, namespace string, id any, fieldPrefix string) (results []VerificationResult) {
458+
func mismatchResultsToVerificationResults(mismatch *MismatchDetails, srcClientDoc, dstClientDoc bson.Raw, namespace string, id bson.RawValue, fieldPrefix string) (results []VerificationResult) {
459+
results = make(
460+
[]VerificationResult,
461+
0,
462+
len(mismatch.missingFieldOnSrc)+len(mismatch.missingFieldOnDst)+len(mismatch.fieldContentsDiffer),
463+
)
459464

460465
for _, field := range mismatch.missingFieldOnSrc {
461466
result := VerificationResult{
462467
Field: fieldPrefix + field,
463468
Details: Missing,
464469
Cluster: ClusterSource,
465-
NameSpace: namespace}
466-
if id != nil {
467-
result.ID = id
470+
NameSpace: namespace,
471+
ID: id,
468472
}
469473

470474
results = append(results, result)
@@ -475,9 +479,8 @@ func mismatchResultsToVerificationResults(mismatch *MismatchDetails, srcClientDo
475479
Field: fieldPrefix + field,
476480
Details: Missing,
477481
Cluster: ClusterTarget,
478-
NameSpace: namespace}
479-
if id != nil {
480-
result.ID = id
482+
NameSpace: namespace,
483+
ID: id,
481484
}
482485

483486
results = append(results, result)
@@ -502,9 +505,8 @@ func mismatchResultsToVerificationResults(mismatch *MismatchDetails, srcClientDo
502505
Field: fieldPrefix + field,
503506
Details: details,
504507
Cluster: ClusterTarget,
505-
NameSpace: namespace}
506-
if id != nil {
507-
result.ID = id
508+
NameSpace: namespace,
509+
ID: id,
508510
}
509511

510512
results = append(results, result)
@@ -564,11 +566,11 @@ func (verifier *Verifier) ProcessVerifyTask(ctx context.Context, workerNum int,
564566
Int("mismatchesCount", len(problems)).
565567
Msg("Discrepancies found. Will recheck in the next generation.")
566568

567-
var dataSizes []int
569+
dataSizes := make([]int, 0, len(problems))
568570

569571
// This stores all IDs for the next generation to check.
570572
// Its length should equal len(mismatches) + len(missingIds).
571-
var idsToRecheck []any
573+
idsToRecheck := make([]bson.RawValue, 0, len(problems))
572574

573575
for _, mismatch := range problems {
574576
idsToRecheck = append(idsToRecheck, mismatch.ID)
@@ -818,7 +820,7 @@ func (verifier *Verifier) compareCollectionSpecifications(
818820
Details: Mismatch + fmt.Sprintf(" : src: %v, dst: %v", srcSpec.Options, dstSpec.Options),
819821
})
820822
} else {
821-
results = append(results, mismatchResultsToVerificationResults(mismatchDetails, srcSpec.Options, dstSpec.Options, srcNs, "spec", "Options.")...)
823+
results = append(results, mismatchResultsToVerificationResults(mismatchDetails, srcSpec.Options, dstSpec.Options, srcNs, mbson.ToRawValue("spec"), "Options.")...)
822824
}
823825
}
824826

@@ -1005,7 +1007,7 @@ func (verifier *Verifier) verifyIndexes(
10051007

10061008
if !theyMatch {
10071009
results = append(results, VerificationResult{
1008-
ID: indexName,
1010+
ID: mbson.ToRawValue(indexName),
10091011
Field: "index",
10101012
NameSpace: FullName(dstColl),
10111013
Cluster: ClusterTarget,
@@ -1014,7 +1016,7 @@ func (verifier *Verifier) verifyIndexes(
10141016
}
10151017
} else {
10161018
results = append(results, VerificationResult{
1017-
ID: indexName,
1019+
ID: mbson.ToRawValue(indexName),
10181020
Field: "index",
10191021
Details: Missing,
10201022
Cluster: ClusterSource,
@@ -1027,7 +1029,7 @@ func (verifier *Verifier) verifyIndexes(
10271029
for indexName := range srcMap {
10281030
if !srcMapUsed[indexName] {
10291031
results = append(results, VerificationResult{
1030-
ID: indexName,
1032+
ID: mbson.ToRawValue(indexName),
10311033
Field: "index",
10321034
Details: Missing,
10331035
Cluster: ClusterTarget,

internal/verifier/migration_verifier_test.go

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/10gen/migration-verifier/internal/testutil"
2626
"github.com/10gen/migration-verifier/internal/types"
2727
"github.com/10gen/migration-verifier/internal/util"
28+
"github.com/10gen/migration-verifier/mbson"
2829
"github.com/10gen/migration-verifier/mslices"
2930
"github.com/cespare/permute/v2"
3031
"github.com/rs/zerolog"
@@ -543,7 +544,7 @@ func (suite *IntegrationTestSuite) TestVerifierFetchDocuments() {
543544
suite.Assert().Regexp(regexp.MustCompile("^"+Mismatch), results[0].Details, "mismatch expected")
544545
suite.Assert().EqualValues(
545546
any(id),
546-
results[0].ID.(bson.RawValue).AsInt64(),
547+
results[0].ID.AsInt64(),
547548
"mismatch recorded as expeceted",
548549
)
549550

@@ -562,7 +563,7 @@ func (suite *IntegrationTestSuite) TestVerifierFetchDocuments() {
562563
suite.Assert().Regexp(regexp.MustCompile("^"+Mismatch), results[0].Details, "mismatch expeceted")
563564
suite.Assert().EqualValues(
564565
any(id),
565-
results[0].ID.(bson.RawValue).AsInt64(),
566+
results[0].ID.AsInt64(),
566567
"mismatch recorded as expeceted",
567568
)
568569
}
@@ -684,7 +685,7 @@ func (suite *IntegrationTestSuite) TestGetPersistedNamespaceStatistics_Recheck()
684685
events: []ParsedEvent{{
685686
OpType: "insert",
686687
Ns: &Namespace{DB: "mydb", Coll: "coll2"},
687-
DocID: "heyhey",
688+
DocID: mbson.ToRawValue("heyhey"),
688689
ClusterTime: &primitive.Timestamp{
689690
T: uint32(time.Now().Unix()),
690691
},
@@ -700,7 +701,7 @@ func (suite *IntegrationTestSuite) TestGetPersistedNamespaceStatistics_Recheck()
700701
events: []ParsedEvent{{
701702
OpType: "insert",
702703
Ns: &Namespace{DB: "mydb", Coll: "coll1"},
703-
DocID: "hoohoo",
704+
DocID: mbson.ToRawValue("hoohoo"),
704705
ClusterTime: &primitive.Timestamp{
705706
T: uint32(time.Now().Unix()),
706707
},
@@ -935,14 +936,29 @@ func (suite *IntegrationTestSuite) TestGetNamespaceStatistics_Gen0() {
935936
func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() {
936937
ctx := suite.Context()
937938
verifier := suite.BuildVerifier()
938-
err := verifier.InsertFailedCompareRecheckDocs(ctx, "foo.bar", []any{42}, []int{100})
939+
err := verifier.InsertFailedCompareRecheckDocs(
940+
ctx,
941+
"foo.bar",
942+
mslices.Of(mbson.ToRawValue(42)),
943+
[]int{100},
944+
)
939945
suite.Require().NoError(err)
940-
err = verifier.InsertFailedCompareRecheckDocs(ctx, "foo.bar", []any{43, 44}, []int{100, 100})
946+
err = verifier.InsertFailedCompareRecheckDocs(
947+
ctx,
948+
"foo.bar",
949+
mslices.Of(mbson.ToRawValue(43), mbson.ToRawValue(44)),
950+
[]int{100, 100},
951+
)
941952
suite.Require().NoError(err)
942-
err = verifier.InsertFailedCompareRecheckDocs(ctx, "foo.bar2", []any{42}, []int{100})
953+
err = verifier.InsertFailedCompareRecheckDocs(
954+
ctx,
955+
"foo.bar2",
956+
mslices.Of(mbson.ToRawValue(42)),
957+
[]int{100},
958+
)
943959
suite.Require().NoError(err)
944960
event := ParsedEvent{
945-
DocID: int32(55),
961+
DocID: mbson.ToRawValue(int32(55)),
946962
OpType: "delete",
947963
Ns: &Namespace{
948964
DB: "foo",
@@ -1056,7 +1072,7 @@ func TestVerifierCompareDocs(t *testing.T) {
10561072
compareFn: func(t *testing.T, mismatchResults []VerificationResult) {
10571073
if assert.Equal(t, 1, len(mismatchResults)) {
10581074
var res int
1059-
require.Nil(t, mismatchResults[0].ID.(bson.RawValue).Unmarshal(&res))
1075+
require.Nil(t, mismatchResults[0].ID.Unmarshal(&res))
10601076
assert.Equal(t, id, res)
10611077
assert.Regexp(t, regexp.MustCompile("^"+Mismatch), mismatchResults[0].Details)
10621078
}
@@ -1074,7 +1090,7 @@ func TestVerifierCompareDocs(t *testing.T) {
10741090
compareFn: func(t *testing.T, mismatchResults []VerificationResult) {
10751091
if assert.Equal(t, 1, len(mismatchResults)) {
10761092
var res int
1077-
require.Nil(t, mismatchResults[0].ID.(bson.RawValue).Unmarshal(&res))
1093+
require.Nil(t, mismatchResults[0].ID.Unmarshal(&res))
10781094
assert.Equal(t, id, res)
10791095
assert.Regexp(t, regexp.MustCompile("^"+Mismatch), mismatchResults[0].Details)
10801096
}
@@ -1620,7 +1636,7 @@ func (suite *IntegrationTestSuite) TestVerifierCompareIndexes() {
16201636
failures = suite.getFailuresForTask(verifier, task.PrimaryKey)
16211637
if suite.Equal(2, len(failures)) {
16221638
sort.Slice(failures, func(i, j int) bool {
1623-
return failures[i].ID.(string) < failures[j].ID.(string)
1639+
return failures[i].ID.StringValue() < failures[j].ID.StringValue()
16241640
})
16251641
suite.Equal(dstIndexNames[1], failures[0].ID)
16261642
suite.Equal(Missing, failures[0].Details)

internal/verifier/recheck.go

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"github.com/10gen/migration-verifier/internal/retry"
1010
"github.com/10gen/migration-verifier/internal/types"
1111
"github.com/10gen/migration-verifier/internal/util"
12-
"github.com/10gen/migration-verifier/mbson"
1312
"github.com/pkg/errors"
1413
"go.mongodb.org/mongo-driver/bson"
1514
"go.mongodb.org/mongo-driver/bson/bsontype"
@@ -50,7 +49,7 @@ type RecheckDoc struct {
5049
// InsertFailedCompareRecheckDocs is for inserting RecheckDocs based on failures during Check.
5150
func (verifier *Verifier) InsertFailedCompareRecheckDocs(
5251
ctx context.Context,
53-
namespace string, documentIDs []any, dataSizes []int) error {
52+
namespace string, documentIDs []bson.RawValue, dataSizes []int) error {
5453
dbName, collName := SplitNamespace(namespace)
5554

5655
dbNames := make([]string, len(documentIDs))
@@ -77,7 +76,7 @@ func (verifier *Verifier) insertRecheckDocs(
7776
ctx context.Context,
7877
dbNames []string,
7978
collNames []string,
80-
documentIDs []any,
79+
documentIDs []bson.RawValue,
8180
dataSizes []int,
8281
) error {
8382
verifier.mux.RLock()
@@ -191,7 +190,7 @@ func (verifier *Verifier) insertRecheckDocs(
191190

192191
func deduplicateRechecks(
193192
dbNames, collNames []string,
194-
documentIDs []any,
193+
documentIDs []bson.RawValue,
195194
dataSizes []int,
196195
) ([]string, []string, []bson.RawValue, []int) {
197196
dedupeMap := map[string]map[string]map[string]int{}
@@ -200,15 +199,13 @@ func deduplicateRechecks(
200199

201200
for i, dbName := range dbNames {
202201
collName := collNames[i]
203-
docID := documentIDs[i]
202+
docIDRaw := documentIDs[i]
204203
dataSize := dataSizes[i]
205204

206-
docIDRaw := mbson.MustConvertToRawValue(docID)
207-
208-
docIDStr := string(append(
209-
[]byte{byte(docIDRaw.Type)},
210-
docIDRaw.Value...,
211-
))
205+
docIDBuf := make([]byte, 1+len(docIDRaw.Value))
206+
docIDBuf[0] = byte(docIDRaw.Type)
207+
copy(docIDBuf[1:], docIDRaw.Value)
208+
docIDStr := string(docIDBuf)
212209

213210
if _, ok := dedupeMap[dbName]; !ok {
214211
dedupeMap[dbName] = map[string]map[string]int{
@@ -251,8 +248,8 @@ func deduplicateRechecks(
251248
rawDocIDs = append(
252249
rawDocIDs,
253250
bson.RawValue{
254-
Type: []bsontype.Type(docIDStr)[0],
255-
Value: []byte(docIDStr)[1:],
251+
Type: bsontype.Type(docIDStr[0]),
252+
Value: []byte(docIDStr[1:]),
256253
},
257254
)
258255
dataSizes = append(dataSizes, dataSize)

internal/verifier/recheck_test.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/10gen/migration-verifier/internal/testutil"
1010
"github.com/10gen/migration-verifier/internal/types"
11+
"github.com/10gen/migration-verifier/mbson"
1112
"github.com/10gen/migration-verifier/mslices"
1213
"github.com/rs/zerolog"
1314
"github.com/samber/lo"
@@ -24,7 +25,7 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() {
2425
verifier.InsertFailedCompareRecheckDocs(
2526
ctx,
2627
"the.namespace",
27-
[]any{"theDocID"},
28+
[]bson.RawValue{mbson.ToRawValue("theDocID")},
2829
[]int{1234},
2930
),
3031
"insert failed-comparison recheck",
@@ -48,7 +49,7 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() {
4849

4950
event := ParsedEvent{
5051
OpType: "insert",
51-
DocID: "theDocID",
52+
DocID: mbson.ToRawValue("theDocID"),
5253
Ns: &Namespace{
5354
DB: "the",
5455
Coll: "namespace",
@@ -488,5 +489,16 @@ func insertRecheckDocs(
488489
collNames[i] = collName
489490
}
490491

491-
return verifier.insertRecheckDocs(ctx, dbNames, collNames, documentIDs, dataSizes)
492+
rawIDs := lo.Map(
493+
documentIDs,
494+
func(idAny any, _ int) bson.RawValue {
495+
btype, buf := lo.Must2(bson.MarshalValue(idAny))
496+
return bson.RawValue{
497+
Type: btype,
498+
Value: buf,
499+
}
500+
},
501+
)
502+
503+
return verifier.insertRecheckDocs(ctx, dbNames, collNames, rawIDs, dataSizes)
492504
}

internal/verifier/result.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package verifier
22

33
import (
44
"github.com/10gen/migration-verifier/option"
5+
"go.mongodb.org/mongo-driver/bson"
56
"go.mongodb.org/mongo-driver/bson/primitive"
67
)
78

@@ -21,7 +22,7 @@ type VerificationResult struct {
2122
// VerificationResult instances might share the same ID. That’s OK,
2223
// though; it’ll just make the recheck include all docs with that ID,
2324
// regardless of which ones actually need the recheck.
24-
ID any
25+
ID bson.RawValue
2526

2627
Field string
2728
Details string

0 commit comments

Comments
 (0)