@@ -10,10 +10,12 @@ import (
1010 "github.com/10gen/migration-verifier/internal/types"
1111 "github.com/10gen/migration-verifier/internal/util"
1212 "github.com/pkg/errors"
13+ "github.com/samber/lo"
1314 "go.mongodb.org/mongo-driver/bson"
1415 "go.mongodb.org/mongo-driver/bson/bsontype"
1516 "go.mongodb.org/mongo-driver/mongo"
1617 "go.mongodb.org/mongo-driver/mongo/options"
18+ "go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
1719)
1820
1921const (
@@ -31,9 +33,22 @@ const (
3133// sorting by _id will guarantee that all rechecks for a given
3234// namespace appear consecutively.
3335type RecheckPrimaryKey struct {
34- SrcDatabaseName string `bson:"db"`
35- SrcCollectionName string `bson:"coll"`
36- DocumentID any `bson:"docID"`
36+ SrcDatabaseName string `bson:"db"`
37+ SrcCollectionName string `bson:"coll"`
38+ DocumentID bson.RawValue `bson:"docID"`
39+ }
40+
41+ var _ bson.Marshaler = & RecheckPrimaryKey {}
42+
43+ func (rk * RecheckPrimaryKey ) MarshalBSON () ([]byte , error ) {
44+ return bsoncore .NewDocumentBuilder ().
45+ AppendString ("db" , rk .SrcDatabaseName ).
46+ AppendString ("coll" , rk .SrcCollectionName ).
47+ AppendValue ("docID" , bsoncore.Value {
48+ Type : rk .DocumentID .Type ,
49+ Data : rk .DocumentID .Value ,
50+ }).
51+ Build (), nil
3752}
3853
3954// RecheckDoc stores the necessary information to know which documents must be rechecked.
@@ -46,6 +61,15 @@ type RecheckDoc struct {
4661 DataSize int `bson:"dataSize"`
4762}
4863
64+ var _ bson.Marshaler = & RecheckDoc {}
65+
66+ func (rd * RecheckDoc ) MarshalBSON () ([]byte , error ) {
67+ return bsoncore .NewDocumentBuilder ().
68+ AppendDocument ("_id" , lo .Must (bson .Marshal (rd .PrimaryKey ))).
69+ AppendInt64 ("dataSize" , int64 (rd .DataSize )).
70+ Build (), nil
71+ }
72+
4973// InsertFailedCompareRecheckDocs is for inserting RecheckDocs based on failures during Check.
5074func (verifier * Verifier ) InsertFailedCompareRecheckDocs (
5175 ctx context.Context ,
@@ -95,50 +119,16 @@ func (verifier *Verifier) insertRecheckDocs(
95119
96120 genCollection := verifier .getRecheckQueueCollection (generation )
97121
98- var recheckBatches [][]mongo.WriteModel
99- var curRechecks []mongo.WriteModel
100- curBatchSize := 0
101- for i , dbName := range dbNames {
102- recheckDoc := RecheckDoc {
103- PrimaryKey : RecheckPrimaryKey {
104- SrcDatabaseName : dbName ,
105- SrcCollectionName : collNames [i ],
106- DocumentID : rawDocIDs [i ],
107- },
108- DataSize : dataSizes [i ],
109- }
110-
111- recheckRaw , err := bson .Marshal (recheckDoc )
112- if err != nil {
113- return errors .Wrapf (err , "marshaling recheck for %#q" , dbName + "." + collNames [i ])
114- }
115-
116- curRechecks = append (
117- curRechecks ,
118- mongo .NewInsertOneModel ().SetDocument (recheckDoc ),
119- )
120- curBatchSize += len (recheckRaw )
121- if curBatchSize > recheckBatchByteLimit || len (curRechecks ) >= recheckBatchCountLimit {
122- recheckBatches = append (recheckBatches , curRechecks )
123- curRechecks = nil
124- curBatchSize = 0
125- }
126- }
127-
128- if len (curRechecks ) > 0 {
129- recheckBatches = append (recheckBatches , curRechecks )
130- }
131-
132- for _ , models := range recheckBatches {
122+ sendRechecks := func (rechecks []bson.Raw ) {
133123 eg .Go (func () error {
134124
135125 retryer := retry .New ()
136126 err := retryer .WithCallback (
137127 func (retryCtx context.Context , _ * retry.FuncInfo ) error {
138- _ , err := genCollection .BulkWrite (
128+ _ , err := genCollection .InsertMany (
139129 retryCtx ,
140- models ,
141- options .BulkWrite ().SetOrdered (false ),
130+ lo . ToAnySlice ( rechecks ) ,
131+ options .InsertMany ().SetOrdered (false ),
142132 )
143133
144134 // We expect duplicate-key errors from the above because:
@@ -157,20 +147,54 @@ func (verifier *Verifier) insertRecheckDocs(
157147 // and document sizes probably remain stable(-ish) across updates.
158148 err = util .TolerateSimpleDuplicateKeyInBulk (
159149 verifier .logger ,
160- len (models ),
150+ len (rechecks ),
161151 err ,
162152 )
163153
164154 return err
165155 },
166156 "persisting %d recheck(s)" ,
167- len (models ),
157+ len (rechecks ),
168158 ).Run (groupCtx , verifier .logger )
169159
170- return errors .Wrapf (err , "batch of %d rechecks" , len (models ))
160+ return errors .Wrapf (err , "batch of %d rechecks" , len (rechecks ))
171161 })
172162 }
173163
164+ curRechecks := make ([]bson.Raw , 0 , recheckBatchCountLimit )
165+ curBatchBytes := 0
166+ for i , dbName := range dbNames {
167+ recheckDoc := RecheckDoc {
168+ PrimaryKey : RecheckPrimaryKey {
169+ SrcDatabaseName : dbName ,
170+ SrcCollectionName : collNames [i ],
171+ DocumentID : rawDocIDs [i ],
172+ },
173+ DataSize : dataSizes [i ],
174+ }
175+
176+ recheckRaw , err := bson .Marshal (recheckDoc )
177+ if err != nil {
178+ return errors .Wrapf (err , "marshaling recheck for %#q" , dbName + "." + collNames [i ])
179+ }
180+
181+ curRechecks = append (
182+ curRechecks ,
183+ bson .Raw (recheckRaw ),
184+ )
185+
186+ curBatchBytes += len (recheckRaw )
187+ if curBatchBytes > recheckBatchByteLimit || len (curRechecks ) >= recheckBatchCountLimit {
188+ sendRechecks (curRechecks )
189+ curRechecks = make ([]bson.Raw , 0 , recheckBatchCountLimit )
190+ curBatchBytes = 0
191+ }
192+ }
193+
194+ if len (curRechecks ) > 0 {
195+ sendRechecks (curRechecks )
196+ }
197+
174198 if err := eg .Wait (); err != nil {
175199 return errors .Wrapf (
176200 err ,
0 commit comments